This commit is contained in:
2024-11-29 13:48:11 +03:00
parent dd2fca15f3
commit 54c306b13b
34 changed files with 9090 additions and 1972 deletions

View File

@@ -1,33 +1,4 @@
#
# Digital Enegry Cloud Orchestration Technology (DECORT) modules for Ansible
# Copyright: (c) 2018-2023 Digital Energy Cloud Solutions LLC
#
# Apache License 2.0 (see http://www.apache.org/licenses/LICENSE-2.0.txt)
#
"""
This is the library of utility functions and classes for managing DECORT cloud platform.
These classes are made aware of Ansible module architecture and designed to be called from the main code of an Ansible
module to fulfill cloud resource management tasks.
Methods defined in this file should NOT implement complex logic like building execution plan for an
upper level Ansible module. However, they do implement necessary sanity checks and may abort upper level module
execution if some fatal error occurs. Being Ansible aware, they usually do so by calling AnsibleModule.fail_json(...)
method with properly configured arguments.
NOTE: this utility library requires DECORT platform version 3.4.0 or higher.
It is not compatible with older versions.
Requirements:
- python >= 3.8
- PyJWT Python module
- requests Python module
- netaddr Python module
- DECORT cloud platform version 3.8.6 or higher
"""
from copy import deepcopy
from datetime import datetime
from enum import Enum
import json
@@ -1095,11 +1066,6 @@ class DecortController(object):
self.result['warning'] = ("compute_bootdisk_size(): new size {} is the same as current for "
"Compute ID {}, nothing to do.").format(new_size, comp_dict['id'])
return
elif new_size < bdisk_size:
self.result['failed'] = False
self.result['warning'] = ("compute_bootdisk_size(): new size {} is less than current {} for "
"Compute ID {}, skipping change.").format(new_size, bdisk_size, comp_dict['id'])
return
api_params = dict(diskId=bdisk_id,
size=new_size)
@@ -1405,11 +1371,15 @@ class DecortController(object):
boot_disk,
image_id,
chipset: Literal['Q35', 'i440fx'] = 'i440fx',
annotation="",
description="",
userdata=None,
sep_id=None,
pool_name=None,
start_on_create=True):
start_on_create=True,
cpu_pin: bool = False,
hp_backed: bool = False,
numa_affinity: Literal['none', 'loose', 'strict'] = 'none',
custom_fields: Optional[dict] = None):
"""Manage KVM VM provisioning. To remove existing KVM VM compute instance use compute_remove method,
to resize use compute_resize, to manage power state use compute_powerstate method.
@@ -1419,7 +1389,7 @@ class DecortController(object):
@param (int) ram: volume of RAM in MB to allocate (i.e. pass 4096 to allocate 4GB RAM).
@param (int) boot_disk: boot disk size in GB.
@param (int) image_id: ID of the OS image to base this Compute on.
@param (string) annotation: optional text description for the VM.
@param (string) description: optional text description for the VM.
@param (string) userdata: additional paramters to pass to cloud-init facility of the guest OS.
@param (bool) start_on_create: set to False if you want the VM to be provisioned in HALTED state.
@@ -1447,8 +1417,8 @@ class DecortController(object):
'chipset': chipset,
'withoutBootDisk': not boot_disk,
}
if annotation:
api_params['decs'] = annotation
if description:
api_params['desc'] = description
if not image_id:
api_url = '/restmachine/cloudapi/kvmx86/createBlank'
@@ -1456,7 +1426,12 @@ class DecortController(object):
api_url = '/restmachine/cloudapi/kvmx86/create'
api_params['imageId'] = image_id
api_params['start'] = start_on_create
api_params['cpupin'] = cpu_pin
api_params['hpBacked'] = hp_backed
api_params['numaAffinity'] = numa_affinity
if custom_fields is not None:
api_params['customFields'] = json.dumps(custom_fields)
if userdata:
api_params['userdata'] = json.dumps(userdata) # we need to pass a string object as "userdata"
@@ -1925,28 +1900,104 @@ class DecortController(object):
@waypoint
@checkmode
def compute_update(self, compute_id: int, name: Optional[str] = None):
def compute_update(
self,
compute_id: int,
name: Optional[str] = None,
chipset: Optional[str] = None,
cpu_pin: Optional[bool] = None,
hp_backed: Optional[bool] = None,
numa_affinity: Optional[str] = None,
description: Optional[str] = None,
):
OBJ = 'compute'
api_response = self.decort_api_call(
self.decort_api_call(
arg_req_function=requests.post,
arg_api_name='/restmachine/cloudapi/compute/update',
arg_params={
'computeId': compute_id,
'name': name,
'chipset': chipset,
'cpupin': cpu_pin,
'hpBacked': hp_backed,
'numaAffinity': numa_affinity,
'desc': description,
},
)
self.set_changed()
if name is not None:
self.message(
self.MESSAGES.obj_renamed(
obj=OBJ,
id=compute_id,
new_name=name,
params_to_check = {
'name': name,
'chipset': chipset,
'cpu_pin': cpu_pin,
'hp_backed': hp_backed,
'numa_affinity': numa_affinity,
'description': description,
}
for param, value in params_to_check.items():
if value is not None:
self.message(
self.MESSAGES.obj_smth_changed(
obj=OBJ,
id=compute_id,
smth=param,
new_value=value,
)
)
)
@waypoint
@checkmode
def compute_set_custom_fields(
self,
compute_id: int,
custom_fields: dict,
):
self.decort_api_call(
arg_req_function=requests.post,
arg_api_name='/restmachine/cloudapi/compute/setCustomFields',
arg_params={
'computeId': compute_id,
'customFields': json.dumps(custom_fields),
},
)
self.set_changed()
@waypoint
@checkmode
def compute_disable_custom_fields(
self,
compute_id: int,
):
self.decort_api_call(
arg_req_function=requests.post,
arg_api_name='/restmachine/cloudapi/compute/deleteCustomFields',
arg_params={
'computeId': compute_id,
},
)
self.set_changed()
@waypoint
def compute_get_custom_fields(self, compute_id: int) -> Optional[dict]:
api_resp = self.decort_api_call(
arg_req_function=requests.post,
arg_api_name='/restmachine/cloudapi/compute/getCustomFields',
arg_params={
'computeId': compute_id
},
not_fail_codes=[404],
)
if api_resp.status_code == 404:
error_msg = api_resp.json()['error']
if 'customFields' in error_msg:
return None
else:
self.message(error_msg)
self.exit(fail=True)
return api_resp.json()
###################################
# OS image manipulation methods
@@ -4928,21 +4979,44 @@ class DecortController(object):
self.result['changed'] = True
return
def k8s_check_new_worker_groups_params(
self,
worker_groups: list[dict[str, Any]]
def k8s_check_worker_group_for_recreate(
self,
target_wg: dict[str, Any],
existing_wg: dict[str, Any],
):
for param in [
'cpu', 'ram', 'disk', 'taints', 'labels', 'annotations',
]:
# Ignore service label when comparing labels
if param == 'labels':
if target_wg[param] is not None:
filtered_existing_wg_labels = [
label for label in existing_wg[param]
if 'workersGroupName' not in label
]
if (
sorted(filtered_existing_wg_labels)
!= sorted(target_wg[param])
):
target_wg['need_to_recreate'] = True
elif (
target_wg[param] is not None
and existing_wg[param] != target_wg[param]
):
target_wg['need_to_recreate'] = True
if (
target_wg['ci_user_data'] is not None
and not target_wg.get('need_to_recreate')
):
check_error = False
for new_wg_params in worker_groups:
for new_wg_required_param in ['num', 'cpu', 'ram', 'disk']:
if new_wg_params[new_wg_required_param] is None:
self.message(
f'Parameter "{new_wg_required_param}" is required'
f' for new worker group "{new_wg_params["name"]}"'
)
check_error = True
if check_error:
self.exit(fail=True)
_, vm_info, _ = self._compute_get_by_id(
comp_id=existing_wg['detailedInfo'][0]['id'],
)
if (
vm_info.get('userdata', {})
!= target_wg['ci_user_data']
):
target_wg['need_to_recreate'] = True
def k8s_provision(self, k8s_name,
k8ci_id,
@@ -4966,41 +5040,25 @@ class DecortController(object):
kubeproxy_conf,
join_conf,
oidc_cert,
annotation,
description,
extnet_only,
master_chipset: Literal['Q35', 'i440fx'] = 'i440fx',
):
self.result['waypoints'] = "{} -> {}".format(self.result['waypoints'], "k8s_provision")
self.k8s_check_new_worker_groups_params(worker_groups=[default_worker])
if self.amodule.check_mode:
self.result['failed'] = False
self.result['msg'] = ("k8s_provision() in check mode. Provision k8s '{}' in RG ID {} "
"was requested.").format(k8s_name, rg_id)
return 0
def_wg_name = default_worker['name']
def_wg_count = default_worker['num']
def_wg_cpu = default_worker['cpu']
def_wg_ram = default_worker['ram']
def_wg_disk = default_worker['disk']
def_wg_sepid = default_worker['sep_id']
def_wg_pool = default_worker['pool']
def_wg_ud = (
json.dumps(default_worker['ci_user_data'])
if default_worker['ci_user_data'] else None
)
def_wg_lab = default_worker['labels']
def_wg_taints = default_worker['taints']
def_wg_ann = default_worker['annotations']
api_url = "/restmachine/cloudapi/k8s/create"
api_params = dict(name=k8s_name,
rgId=rg_id,
k8ciId=k8ci_id,
vinsId=vins_id,
workerGroupName=def_wg_name,
workerGroupName=default_worker['name'],
networkPlugin=plugin,
masterNum=master_count,
masterCpu=master_cpu,
@@ -5008,15 +5066,15 @@ class DecortController(object):
masterDisk=master_disk,
masterSepId=master_sepid,
masterSepPool=master_pool,
workerNum=def_wg_count,
workerCpu=def_wg_cpu,
workerRam=def_wg_ram,
workerDisk=def_wg_disk,
workerSepId=def_wg_sepid,
workerSepPool=def_wg_pool,
labels=def_wg_lab,
taints=def_wg_taints,
annotations=def_wg_ann,
workerNum=default_worker['num'],
workerCpu=default_worker['cpu'],
workerRam=default_worker['ram'],
workerDisk=default_worker['disk'],
workerSepId=default_worker['sep_id'],
workerSepPool=default_worker['pool'],
labels=default_worker['labels'],
taints=default_worker['taints'],
annotations=default_worker['annotations'],
extnetId=extnet_id,
withLB=with_lb,
highlyAvailableLB=ha_lb,
@@ -5026,9 +5084,10 @@ class DecortController(object):
kubeletConfiguration=json.dumps(kublet_conf) if kublet_conf else None,
kubeProxyConfiguration=json.dumps(kubeproxy_conf)if kubeproxy_conf else None,
joinConfiguration=json.dumps(join_conf)if join_conf else None,
desc=annotation,
userData=def_wg_ud,
desc=description,
userData=json.dumps(default_worker['ci_user_data']),
extnetOnly=extnet_only,
chipset=master_chipset,
)
upload_files = None
@@ -5104,23 +5163,64 @@ class DecortController(object):
for rec in arg_modwg:
if rec['name'] not in wg_inner:
wg_add_list.append(rec)
self.k8s_check_new_worker_groups_params(worker_groups=wg_add_list)
for rec_inn in arg_k8swg['k8sGroups']['workers']:
for rec_out in arg_modwg:
if rec_inn['name'] == rec_out['name']:
if (
rec_out['num'] is not None
and rec_inn['num'] != rec_out['num']
):
count = rec_inn['num']-rec_out['num']
cmp_list = []
if count > 0:
for cmp in rec_inn['detailedInfo'][-count:]:
cmp_list.append(cmp['id'])
wg_moddel_list.append({rec_inn['id']:cmp_list})
if count < 0:
wg_modadd_list.append({rec_inn['id']:abs(count)})
for wg in arg_k8swg['k8sGroups']['workers']:
for target_wg in arg_modwg:
if wg['name'] == target_wg['name']:
self.k8s_check_worker_group_for_recreate(
target_wg=target_wg,
existing_wg=wg,
)
if target_wg.get('need_to_recreate'):
wg_del_list.append(wg['id'])
wg_to_create = deepcopy(target_wg)
for param, value in wg_to_create.items():
if param == 'ci_user_data' and value is None:
_, vm_info, _ = self._compute_get_by_id(
comp_id=wg['detailedInfo'][0]['id'],
)
wg_to_create[param] = vm_info.get(
'userdata', {}
)
elif value is None:
wg_to_create[param] = wg.get(param)
wg_add_list.append(wg_to_create)
continue
w_ids = {w['id'] for w in wg['detailedInfo']}
bad_w_ids = set()
new_chipset = target_wg['chipset']
if new_chipset is not None:
for w_id in w_ids:
_, vm_info, _ = self._compute_get_by_id(
comp_id=w_id,
)
if vm_info['chipset'] != new_chipset:
bad_w_ids.add(w_id)
wg_num = wg['num']
target_num = target_wg['num']
if target_num is not None:
new_w_count = target_num - wg_num + len(bad_w_ids)
if new_w_count > 0:
if new_chipset is None:
new_chipset = self.wg_default_params['chipset']
wg_modadd_list.append({
'wg_id': wg['id'],
'computes_num': new_w_count,
'chipset': new_chipset,
})
elif new_w_count:
valid_w_ids = w_ids.difference(bad_w_ids)
for _ in range(abs(new_w_count)):
bad_w_ids.add(valid_w_ids.pop())
if bad_w_ids:
wg_moddel_list.append({
'wg_id': wg['id'],
'compute_ids': bad_w_ids,
})
if wg_del_list:
for wgid in wg_del_list:
@@ -5129,22 +5229,24 @@ class DecortController(object):
self.result['changed'] = True
if wg_add_list:
for wg in wg_add_list:
wg_to_create = deepcopy(wg)
for param, default_value in self.wg_default_params.items():
if wg_to_create[param] is None:
wg_to_create[param] = default_value
api_params = {
'k8sId': self.k8s_id,
'name': wg['name'],
'workerNum': wg['num'],
'workerCpu': wg['cpu'],
'workerRam': wg['ram'],
'workerDisk': wg['disk'],
'workerSepId': wg['sep_id'],
'workerSepPool': wg['pool'],
'labels': wg['labels'],
'taints': wg['taints'],
'annotations': wg['annotations'],
'userData': (
json.dumps(wg['ci_user_data'])
if wg['ci_user_data'] else None
),
'name': wg_to_create['name'],
'workerNum': wg_to_create['num'],
'workerCpu': wg_to_create['cpu'],
'workerRam': wg_to_create['ram'],
'workerDisk': wg_to_create['disk'],
'workerSepId': wg_to_create['sep_id'],
'workerSepPool': wg_to_create['pool'],
'labels': wg_to_create['labels'],
'taints': wg_to_create['taints'],
'annotations': wg_to_create['annotations'],
'userData': json.dumps(wg_to_create['ci_user_data']),
'chipset': wg_to_create['chipset'],
}
wg_add_response = self.decort_api_call(
arg_req_function=requests.post,
@@ -5160,7 +5262,7 @@ class DecortController(object):
# average time to add a single worker group * reserve
wg_add_avg_time = 210 * 2
wg_add_timeout = wg_add_avg_time * wg['num']
wg_add_timeout = wg_add_avg_time * wg_to_create['num']
task_schedule_timeout = 600
sleep_interval = 5
while True:
@@ -5174,9 +5276,9 @@ class DecortController(object):
case 'SCHEDULED':
if task_schedule_timeout <= 0:
self.message(
f'Time to schedule task'
f'to add worker group {wg["name"]}'
f'has been exceeded.'
f'Time to schedule task to add worker '
f'group {wg_to_create["name"]} has been '
f'exceeded.'
f'\nTask details: {task_link}'
)
self.exit(fail=True)
@@ -5185,38 +5287,54 @@ class DecortController(object):
case 'PROCESSING':
if wg_add_timeout <= 0:
self.message(
f'Time to add worker group {wg["name"]} '
f'has been exceeded.'
f'\nTask details: {task_link}'
f'Time to add worker group '
f'{wg_to_create["name"]} has been '
f'exceeded.\nTask details: {task_link}'
)
self.exit(fail=True)
time.sleep(sleep_interval)
wg_add_timeout -= sleep_interval
case 'ERROR':
self.result['msg'] = (
f'Adding worker group {wg["name"]} failed: '
f'{response_data["error"]}.'
f'Adding worker group {wg_to_create["name"]} '
f'failed: {response_data["error"]}.'
f'\nTask details: {task_link}'
)
self.exit(fail=True)
case 'OK':
self.message(
f'Worker group {wg["name"]} created successful'
f'Worker group {wg_to_create["name"]} '
f'created successful'
)
break
if wg_modadd_list:
for wg in wg_modadd_list:
for key in wg:
api_params = dict(k8sId=self.k8s_id,workersGroupId=key,num=wg[key])
api_resp = self.decort_api_call(requests.post, "/restmachine/cloudapi/k8s/workerAdd", api_params)
self.result['changed'] = True
api_params = {
'k8sId': self.k8s_id,
'workersGroupId': wg['wg_id'],
'num': wg['computes_num'],
'chipset': wg['chipset'],
}
self.decort_api_call(
arg_req_function=requests.post,
arg_api_name='/restmachine/cloudapi/k8s/workerAdd',
arg_params=api_params,
)
self.result['changed'] = True
if wg_moddel_list:
for wg in wg_moddel_list:
for key in wg:
for cmpid in wg[key]:
api_params = dict(k8sId=self.k8s_id,workersGroupId=key,workerId=cmpid)
api_resp = self.decort_api_call(requests.post, "/restmachine/cloudapi/k8s/deleteWorkerFromGroup", api_params)
self.result['changed'] = True
for compute_id in wg['compute_ids']:
api_params = {
'k8sId': self.k8s_id,
'workersGroupId': wg['wg_id'],
'workerId': compute_id,
}
self.decort_api_call(
arg_req_function=requests.post,
arg_api_name='/restmachine/cloudapi/k8s/deleteWorkerFromGroup',
arg_params=api_params,
)
self.result['changed'] = True
self.result['failed'] = False
return
@@ -5688,7 +5806,7 @@ class DecortController(object):
self.amodule.fail_json(**self.result)
return 0, None
def lb_provision(self,lb_name,rg_id,vins_id,ext_net_id,ha_status,annotation,start=True):
def lb_provision(self,lb_name,rg_id,vins_id,ext_net_id,ha_status,description,start=True):
"""Provision LB according to the specified arguments.
If critical error occurs the embedded call to API function will abort further execution of
the script and relay error to Ansible.
@@ -5717,7 +5835,7 @@ class DecortController(object):
vinsId=vins_id,
highlyAvailable=ha_status,
start=start,
desc=annotation
desc=description
)
api_resp = self.decort_api_call(requests.post, api_url, api_params)
# On success the above call will return here. On error it will abort execution by calling fail_json.