7.0.0
This commit is contained in:
@@ -3,7 +3,7 @@ from datetime import datetime
|
||||
from enum import Enum
|
||||
import json
|
||||
import re
|
||||
from typing import Any, Callable, Iterable, Literal, Optional
|
||||
from typing import Any, Callable, Iterable, Literal, Optional, Tuple
|
||||
import time
|
||||
|
||||
import jwt
|
||||
@@ -156,6 +156,15 @@ class DecortController(object):
|
||||
ARCXDU = 'ARCXDU'
|
||||
CXDRAU = 'CXDRAU'
|
||||
|
||||
|
||||
class VMNetType(Enum):
|
||||
VINS = 'VINS'
|
||||
EXTNET = 'EXTNET'
|
||||
VFNIC = 'VFNIC'
|
||||
EMPTY = 'EMPTY'
|
||||
DPDK = 'DPDK'
|
||||
|
||||
|
||||
class MESSAGES:
|
||||
@staticmethod
|
||||
def ssl_error(url: None | str = None):
|
||||
@@ -298,14 +307,15 @@ class DecortController(object):
|
||||
self.result = {'failed': False, 'changed': False, 'waypoints': "Init"}
|
||||
|
||||
self.authenticator = arg_amodule.params['authenticator']
|
||||
self.controller_url = arg_amodule.params['controller_url']
|
||||
self.controller_url = arg_amodule.params.get('controller_url')
|
||||
|
||||
self.jwt = arg_amodule.params['jwt']
|
||||
self.jwt = arg_amodule.params.get('jwt')
|
||||
self.app_id = arg_amodule.params['app_id']
|
||||
self.app_secret = arg_amodule.params['app_secret']
|
||||
self.oauth2_url = arg_amodule.params['oauth2_url']
|
||||
# self.password = arg_amodule.params['password']
|
||||
# self.user = arg_amodule.params['user']
|
||||
self.domain = arg_amodule.params['domain']
|
||||
self.password = arg_amodule.params['password']
|
||||
self.username = arg_amodule.params['username']
|
||||
self.session_key = ''
|
||||
# self.iconf = arg_iconf
|
||||
self.verify_ssl = arg_amodule.params['verify_ssl']
|
||||
@@ -328,18 +338,12 @@ class DecortController(object):
|
||||
self.result['msg'] = ("JWT based authentication requested, but no JWT specified. "
|
||||
"Use 'jwt' parameter or set 'DECORT_JWT' environment variable")
|
||||
self.amodule.fail_json(**self.result)
|
||||
elif self.authenticator == "legacy":
|
||||
if not self.password:
|
||||
self.result['failed'] = True
|
||||
self.result['msg'] = ("Legacy user authentication requested, but no password specified. "
|
||||
"Use 'password' parameter or set 'DECORT_PASSWORD' environment variable.")
|
||||
self.amodule.fail_json(**self.result)
|
||||
if not self.user:
|
||||
self.result['failed'] = True
|
||||
self.result['msg'] = ("Legacy user authentication requested, but no user specified. "
|
||||
"Use 'user' parameter or set 'DECORT_USER' environment variable.")
|
||||
self.amodule.fail_json(**self.result)
|
||||
elif self.authenticator == "oauth2":
|
||||
elif self.authenticator in ('oauth2', 'decs3o', 'bvs'):
|
||||
if self.authenticator == 'oauth2':
|
||||
self.result['warning'] = (
|
||||
'"oauth2" authenticator type is deprecated and might be '
|
||||
'removed in newer versions. Please use "decs3o" instead.'
|
||||
)
|
||||
if not self.app_id:
|
||||
self.result['failed'] = True
|
||||
self.result['msg'] = ("Oauth2 based authentication requested, but no application ID specified. "
|
||||
@@ -355,6 +359,28 @@ class DecortController(object):
|
||||
self.result['msg'] = ("Oauth2 base authentication requested, but no Oauth2 provider URL specified. "
|
||||
"Use 'oauth2_url' parameter or set 'DECORT_OAUTH2_URL' environment variable.")
|
||||
self.amodule.fail_json(**self.result)
|
||||
if self.authenticator == 'bvs':
|
||||
if not self.domain:
|
||||
self.message(
|
||||
'BVS authentication requested, but no domain '
|
||||
'specified. Use "domain" parameter or set '
|
||||
'"DECORT_DOMAIN" environment variable.'
|
||||
)
|
||||
self.exit(fail=True)
|
||||
if not self.password:
|
||||
self.message(
|
||||
'BVS authentication requested, but no password '
|
||||
'specified. Use "password" parameter or set '
|
||||
'"DECORT_PASSWORD" environment variable.'
|
||||
)
|
||||
self.exit(fail=True)
|
||||
if not self.username:
|
||||
self.message(
|
||||
'BVS authentication requested, but no user specified. '
|
||||
'Use "user" parameter or set "DECORT_USER" '
|
||||
'environment variable.'
|
||||
)
|
||||
self.exit(fail=True)
|
||||
else:
|
||||
# Unknown authenticator type specified - notify and exit
|
||||
self.result['failed'] = True
|
||||
@@ -368,15 +394,12 @@ class DecortController(object):
|
||||
self.validate_jwt() # this call will abort the script if validation fails
|
||||
jwt_decoded = jwt.decode(self.jwt, algorithms=["ES384"], options={"verify_signature": False})
|
||||
self.decort_username = jwt_decoded['username'] + "@" + jwt_decoded['iss']
|
||||
elif self.authenticator == "legacy":
|
||||
# obtain session id from the DECORT controller and thus validate the the legacy user
|
||||
self.validate_legacy_user() # this call will abort the script if validation fails
|
||||
self.decort_username = self.user
|
||||
else:
|
||||
# self.authenticator == "oauth2" - Oauth2 based authorization mode
|
||||
# Oauth2 based authorization mode
|
||||
# obtain JWT from Oauth2 provider and validate on the DECORT controller
|
||||
self.obtain_oauth2_jwt()
|
||||
self.validate_jwt() # this call will abort the script if validation fails
|
||||
self.obtain_jwt()
|
||||
if self.controller_url is not None:
|
||||
self.validate_jwt() # this call will abort the script if validation fails
|
||||
jwt_decoded = jwt.decode(self.jwt, algorithms=["ES384"], options={"verify_signature": False})
|
||||
self.decort_username = jwt_decoded['username'] + "@" + jwt_decoded['iss']
|
||||
|
||||
@@ -441,7 +464,7 @@ class DecortController(object):
|
||||
return int(datetime(**datetime_args).timestamp())
|
||||
|
||||
@staticmethod
|
||||
def sec_to_dt_str(sec: int, str_format: str = '%Y-%m-%d_%H-%M-%S') -> str:
|
||||
def sec_to_dt_str(sec: float, str_format: str = '%Y-%m-%d_%H-%M-%S') -> str:
|
||||
"""
|
||||
Convert the Unix-time int to the datetime string of the format
|
||||
from `str_format`.
|
||||
@@ -466,13 +489,17 @@ class DecortController(object):
|
||||
),
|
||||
authenticator=dict(
|
||||
type='str',
|
||||
required=True,
|
||||
choices=['oauth2', 'jwt']
|
||||
choices=['oauth2', 'jwt', 'bvs', 'decs3o'],
|
||||
default='decs3o',
|
||||
),
|
||||
controller_url=dict(
|
||||
type='str',
|
||||
required=True
|
||||
),
|
||||
domain=dict(
|
||||
type='str',
|
||||
fallback=(env_fallback, ['DECORT_DOMAIN']),
|
||||
),
|
||||
jwt=dict(
|
||||
type='str',
|
||||
fallback=(env_fallback, ['DECORT_JWT']),
|
||||
@@ -482,14 +509,35 @@ class DecortController(object):
|
||||
type='str',
|
||||
fallback=(env_fallback, ['DECORT_OAUTH2_URL'])
|
||||
),
|
||||
password=dict(
|
||||
type='str',
|
||||
fallback=(env_fallback, ['DECORT_PASSWORD']),
|
||||
),
|
||||
username=dict(
|
||||
type='str',
|
||||
fallback=(env_fallback, ['DECORT_USERNAME']),
|
||||
),
|
||||
verify_ssl=dict(
|
||||
type='bool',
|
||||
default=True
|
||||
),
|
||||
),
|
||||
required_if=[
|
||||
('authenticator', 'oauth2',
|
||||
('oauth2_url', 'app_id', 'app_secret')),
|
||||
(
|
||||
'authenticator', 'oauth2',
|
||||
('oauth2_url', 'app_id', 'app_secret'),
|
||||
),
|
||||
(
|
||||
'authenticator', 'decs3o',
|
||||
('oauth2_url', 'app_id', 'app_secret'),
|
||||
),
|
||||
(
|
||||
'authenticator', 'bvs',
|
||||
(
|
||||
'oauth2_url', 'app_id', 'app_secret',
|
||||
'domain', 'username', 'password',
|
||||
),
|
||||
),
|
||||
('authenticator', 'jwt', ('jwt',))
|
||||
],
|
||||
)
|
||||
@@ -570,8 +618,14 @@ class DecortController(object):
|
||||
else:
|
||||
return True
|
||||
|
||||
def obtain_oauth2_jwt(self):
|
||||
"""Obtain JWT from the Oauth2 provider using application ID and application secret provided , as specified at
|
||||
def obtain_jwt(self):
|
||||
if self.authenticator in ('oauth2', 'decs3o'):
|
||||
self.jwt = self.obtain_decs3o_jwt()
|
||||
elif self.authenticator == 'bvs':
|
||||
self.jwt = self.obtain_bvs_jwt()
|
||||
|
||||
def obtain_decs3o_jwt(self):
|
||||
"""Obtain JWT from the Oauth2 DECS3O provider using application ID and application secret provided , as specified at
|
||||
class instance init method.
|
||||
|
||||
If method fails to obtain JWT it will abort the execution of the script by calling AnsibleModule.fail_json()
|
||||
@@ -620,6 +674,64 @@ class DecortController(object):
|
||||
self.jwt = token_get_resp.content.decode('utf8')
|
||||
return self.jwt
|
||||
|
||||
def obtain_bvs_jwt(self):
|
||||
"""
|
||||
Obtain JWT from the Oauth2 BVS provider using
|
||||
application ID, application secret, username and password provided
|
||||
|
||||
If method fails to obtain JWT it will abort the execution of the script
|
||||
by calling self.exit(fail=True).
|
||||
|
||||
@return: JWT as string.
|
||||
"""
|
||||
token_get_url = (
|
||||
f'{self.oauth2_url}/realms/{self.domain}/'
|
||||
f'protocol/openid-connect/token'
|
||||
)
|
||||
request_data = {
|
||||
'grant_type': 'password',
|
||||
'client_id': self.app_id,
|
||||
'client_secret': self.app_secret,
|
||||
'username': self.username,
|
||||
'password': self.password,
|
||||
'response_type': 'token',
|
||||
'scope': 'openid',
|
||||
}
|
||||
|
||||
token_get_resp = None
|
||||
try:
|
||||
token_get_resp = requests.post(
|
||||
url=token_get_url,
|
||||
data=request_data,
|
||||
verify=self.verify_ssl,
|
||||
)
|
||||
except requests.exceptions.SSLError:
|
||||
self.message(self.MESSAGES.ssl_error(url=token_get_url))
|
||||
self.exit(fail=True)
|
||||
except requests.exceptions.ConnectionError as con_error:
|
||||
self.message(
|
||||
f'Failed to connect to "{token_get_url}" '
|
||||
f'to obtain JWT access token: {con_error}'
|
||||
)
|
||||
self.exit(fail=True)
|
||||
except requests.exceptions.Timeout as timeout_error:
|
||||
self.message(
|
||||
f'Timeout when trying to connect to "{token_get_url}" '
|
||||
f'to obtain JWT access token: {timeout_error}'
|
||||
)
|
||||
self.exit(fail=True)
|
||||
if token_get_resp.status_code != 200:
|
||||
self.message(
|
||||
f'Failed to obtain JWT access token from '
|
||||
f'oauth2_url "{token_get_url}" '
|
||||
f'for app_id "{self.amodule.params["app_id"]}": '
|
||||
f'HTTP status code {token_get_resp.status_code}, '
|
||||
f'reason "{token_get_resp.reason}"'
|
||||
)
|
||||
self.exit(fail=True)
|
||||
self.jwt = token_get_resp.json()['access_token']
|
||||
return self.jwt
|
||||
|
||||
def validate_jwt(self, arg_jwt=None):
|
||||
"""Validate JWT against DECORT controller. JWT can be supplied as argument to this method. If None supplied as
|
||||
argument, JWT will be taken from class attribute. DECORT controller URL will always be taken from the class
|
||||
@@ -632,15 +744,6 @@ class DecortController(object):
|
||||
AnsibleModule.fail_json() method.
|
||||
"""
|
||||
|
||||
if self.authenticator not in ('oauth2', 'jwt'):
|
||||
# sanity check - JWT is relevant in oauth2 or jwt authentication modes only
|
||||
self.result['msg'] = "Cannot validate JWT for incompatible authentication mode '{}'".format(
|
||||
self.authenticator)
|
||||
self.amodule.fail_json(**self.result)
|
||||
# The above call to fail_json will abort the script, so the following return statement will
|
||||
# never be executed
|
||||
return False
|
||||
|
||||
if not arg_jwt:
|
||||
# If no JWT is passed as argument to this method, we will validate JWT stored in the class
|
||||
# instance (if any)
|
||||
@@ -686,58 +789,6 @@ class DecortController(object):
|
||||
# If we fall through here, then everything went well.
|
||||
return True
|
||||
|
||||
def validate_legacy_user(self):
|
||||
"""Validate legacy user by obtaining a session key, which will be used for authenticating subsequent API calls
|
||||
to DECORT controller.
|
||||
If successful, the session key is stored in self.session_key and True is returned. If unsuccessful for any
|
||||
reason, the method will abort.
|
||||
|
||||
@return: True on successful validation of the legacy user.
|
||||
"""
|
||||
|
||||
if self.authenticator != 'legacy':
|
||||
self.result['failed'] = True
|
||||
self.result['msg'] = "Cannot validate legacy user for incompatible authentication mode '{}'".format(
|
||||
self.authenticator)
|
||||
self.amodule.fail_json(**self.result)
|
||||
return False
|
||||
|
||||
req_url = self.controller_url + "/restmachine/cloudapi/user/authenticate"
|
||||
req_data = dict(username=self.user,
|
||||
password=self.password, )
|
||||
|
||||
try:
|
||||
api_resp = requests.post(req_url, data=req_data, verify=self.verify_ssl)
|
||||
except requests.exceptions.SSLError:
|
||||
self.message(self.MESSAGES.ssl_error(url=req_url))
|
||||
self.exit(fail=True)
|
||||
except requests.exceptions.ConnectionError:
|
||||
self.result['failed'] = True
|
||||
self.result['msg'] = "Failed to connect to '{}' while validating legacy user".format(req_url)
|
||||
self.amodule.fail_json(**self.result)
|
||||
return False # actually, this directive will never be executed as fail_json exits the script
|
||||
except requests.exceptions.Timeout:
|
||||
self.result['failed'] = True
|
||||
self.result['msg'] = "Timeout when trying to connect to '{}' while validating legacy user".format(req_url)
|
||||
self.amodule.fail_json(**self.result)
|
||||
return False
|
||||
|
||||
if api_resp.status_code != 200:
|
||||
self.result['failed'] = True
|
||||
self.result['msg'] = ("Failed to validate legacy user access to DECORT controller URL '{}': "
|
||||
"HTTP status code {}, reason '{}'").format(req_url,
|
||||
api_resp.status_code,
|
||||
api_resp.reason)
|
||||
self.amodule.fail_json(**self.result)
|
||||
return False
|
||||
|
||||
# Assign session key to the corresponding class attribute.
|
||||
# Note that the above API call returns session key as a string with double quotes, which we need to
|
||||
# remove before it can be used as 'session=...' parameter to DECORT controller API calls
|
||||
self.session_key = api_resp.content.decode('utf8').replace('"', '')
|
||||
|
||||
return True
|
||||
|
||||
def decort_api_call(
|
||||
self,
|
||||
arg_req_function,
|
||||
@@ -779,10 +830,7 @@ class DecortController(object):
|
||||
|
||||
req_url = self.controller_url + arg_api_name
|
||||
|
||||
if self.authenticator == 'legacy':
|
||||
arg_params['authkey'] = self.session_key
|
||||
elif self.authenticator in ('jwt', 'oauth2'):
|
||||
http_headers['Authorization'] = 'bearer {}'.format(self.jwt)
|
||||
http_headers['Authorization'] = 'bearer {}'.format(self.jwt)
|
||||
|
||||
while retry_counter > 0:
|
||||
try:
|
||||
@@ -1479,15 +1527,14 @@ class DecortController(object):
|
||||
ifaces_for_delete = ifaces
|
||||
nets_for_attach = new_networks
|
||||
else:
|
||||
EMPTY = 'EMPTY'
|
||||
|
||||
# Creating dictionaries in which the key is a net type + a net id
|
||||
# + (optional) postfix
|
||||
# For empty networks a net id is the number of the empty network
|
||||
ifaces_dict = {}
|
||||
empty_ifaces_count = 0
|
||||
for iface in ifaces:
|
||||
net_type = iface['netType']
|
||||
if net_type == EMPTY:
|
||||
if net_type == self.VMNetType.EMPTY.value:
|
||||
empty_ifaces_count += 1
|
||||
net_id = empty_ifaces_count
|
||||
else:
|
||||
@@ -1497,13 +1544,22 @@ class DecortController(object):
|
||||
new_nets_dict = {}
|
||||
empty_new_nets_count = 0
|
||||
for net in new_networks:
|
||||
net_type = net["type"]
|
||||
if net_type == EMPTY:
|
||||
net_type = net['type']
|
||||
if net_type == self.VMNetType.EMPTY.value:
|
||||
empty_new_nets_count += 1
|
||||
net_id = empty_new_nets_count
|
||||
else:
|
||||
net_id = net['id']
|
||||
net_key = f'{net_type}{net_id}'
|
||||
|
||||
# If DPDK iface MTU is new then add postfix
|
||||
if net_type == self.VMNetType.DPDK.value:
|
||||
net_mtu = net['mtu']
|
||||
if net_mtu is not None:
|
||||
iface = ifaces_dict.get(net_key)
|
||||
if iface and net_mtu != iface['mtu']:
|
||||
net_key = f'{net_key}_new-mtu'
|
||||
|
||||
new_nets_dict[net_key] = net
|
||||
|
||||
# The networks that no need to be disconnected or reconnected
|
||||
@@ -1569,6 +1625,7 @@ class DecortController(object):
|
||||
'netType': net['type'],
|
||||
'netId': net.get('id') or 0,
|
||||
'ipAddr': net.get('ip_addr'),
|
||||
'mtu': net.get('mtu'),
|
||||
},
|
||||
)
|
||||
self.set_changed()
|
||||
@@ -1925,6 +1982,7 @@ class DecortController(object):
|
||||
hp_backed: Optional[bool] = None,
|
||||
numa_affinity: Optional[str] = None,
|
||||
description: Optional[str] = None,
|
||||
auto_start: Optional[bool] = None,
|
||||
):
|
||||
OBJ = 'compute'
|
||||
|
||||
@@ -1939,6 +1997,7 @@ class DecortController(object):
|
||||
'hpBacked': hp_backed,
|
||||
'numaAffinity': numa_affinity,
|
||||
'desc': description,
|
||||
'autoStart': auto_start,
|
||||
},
|
||||
)
|
||||
|
||||
@@ -1951,6 +2010,7 @@ class DecortController(object):
|
||||
'hp_backed': hp_backed,
|
||||
'numa_affinity': numa_affinity,
|
||||
'description': description,
|
||||
'auto_start': auto_start,
|
||||
}
|
||||
for param, value in params_to_check.items():
|
||||
if value is not None:
|
||||
@@ -6474,3 +6534,136 @@ class DecortController(object):
|
||||
self.result['changed'] = True
|
||||
|
||||
return
|
||||
|
||||
@waypoint
|
||||
@checkmode
|
||||
def snapshot_create(self, compute_id: int, label: str):
|
||||
"""
|
||||
Implementation of functionality of the API method
|
||||
`/cloudapi/compute/snapshotCreate`.
|
||||
"""
|
||||
self.decort_api_call(
|
||||
arg_req_function=requests.post,
|
||||
arg_api_name='/restmachine/cloudapi/compute/snapshotCreate',
|
||||
arg_params={
|
||||
'computeId': compute_id,
|
||||
'label': label,
|
||||
},
|
||||
)
|
||||
self.set_changed()
|
||||
self.message(
|
||||
f'Snapshot {label} for VM ID {compute_id} created successfully.'
|
||||
)
|
||||
|
||||
@waypoint
|
||||
@checkmode
|
||||
def snapshot_delete(
|
||||
self,
|
||||
compute_id: int,
|
||||
label: str,
|
||||
):
|
||||
"""
|
||||
Implementation of functionality of the API method
|
||||
`/cloudapi/compute/snapshotDelete`.
|
||||
|
||||
The method `self.exit(fail=True)` will be
|
||||
called if the time for scheduling or for deleting a snapshot
|
||||
is exceeded, or if an error occurs during deletion.
|
||||
"""
|
||||
snapshot_delete_response = self.decort_api_call(
|
||||
arg_req_function=requests.post,
|
||||
arg_api_name='/restmachine/cloudapi/compute/snapshotDelete',
|
||||
arg_params={
|
||||
'computeId': compute_id,
|
||||
'label': label,
|
||||
'asyncMode': True,
|
||||
},
|
||||
)
|
||||
self.set_changed()
|
||||
audit_id = snapshot_delete_response.text.strip('"')
|
||||
task_link = (
|
||||
f'{self.controller_url}/portal/#/system/tasks/{audit_id}'
|
||||
)
|
||||
params = {'auditId': audit_id}
|
||||
|
||||
task_schedule_timeout = 600
|
||||
snapshot_delete_timeout = 120
|
||||
sleep_interval = 5
|
||||
while True:
|
||||
task_response = self.decort_api_call(
|
||||
arg_req_function=requests.post,
|
||||
arg_api_name='/restmachine/cloudapi/tasks/get',
|
||||
arg_params=params,
|
||||
)
|
||||
response_data = task_response.json()
|
||||
match response_data['status']:
|
||||
case 'SCHEDULED':
|
||||
if task_schedule_timeout <= 0:
|
||||
self.message(
|
||||
f'Time to schedule task to delete snapshot for '
|
||||
f'VM ID {compute_id} has been exceeded.'
|
||||
f'\nTask details: {task_link}'
|
||||
)
|
||||
self.exit(fail=True)
|
||||
task_schedule_timeout -= sleep_interval
|
||||
case 'PROCESSING':
|
||||
if snapshot_delete_timeout <= 0:
|
||||
self.message(
|
||||
f'Time to delete snapshot for VM ID {compute_id} '
|
||||
f'has been exceeded.'
|
||||
f'\nTask details: {task_link}'
|
||||
)
|
||||
self.exit(fail=True)
|
||||
snapshot_delete_timeout -= sleep_interval
|
||||
case 'ERROR':
|
||||
self.result['msg'] = (
|
||||
f'Deleting snapshot for VM ID {compute_id} failed:'
|
||||
f'{response_data["error"]}.'
|
||||
f'\nTask details: {task_link}'
|
||||
)
|
||||
self.exit(fail=True)
|
||||
case 'OK':
|
||||
self.message(
|
||||
f'Snapshot {label} for VM ID {compute_id} '
|
||||
f'deleted successfully.'
|
||||
)
|
||||
break
|
||||
time.sleep(sleep_interval)
|
||||
|
||||
@waypoint
|
||||
def snapshot_list(self, compute_id: int) -> list:
|
||||
"""
|
||||
Implementation of functionality of the API method
|
||||
`/cloudapi/compute/snapshotList`.
|
||||
"""
|
||||
snapshots_list_response = self.decort_api_call(
|
||||
arg_req_function=requests.post,
|
||||
arg_api_name='/restmachine/cloudapi/compute/snapshotList',
|
||||
arg_params={
|
||||
'computeId': compute_id,
|
||||
}
|
||||
)
|
||||
return snapshots_list_response.json()['data']
|
||||
|
||||
@waypoint
|
||||
def snapshot_usage(
|
||||
self,
|
||||
compute_id: int,
|
||||
label: Optional[str],
|
||||
) -> tuple[dict, list[dict]]:
|
||||
"""
|
||||
Implementation of functionality of the API method
|
||||
`/cloudapi/compute/snapshotUsage`.
|
||||
"""
|
||||
snapshot_usage_response = self.decort_api_call(
|
||||
arg_req_function=requests.post,
|
||||
arg_api_name='/restmachine/cloudapi/compute/snapshotUsage',
|
||||
arg_params={
|
||||
'computeId': compute_id,
|
||||
'label': label,
|
||||
}
|
||||
)
|
||||
snapshot_usage = snapshot_usage_response.json()
|
||||
common_snapshot_usage_info = snapshot_usage[0]
|
||||
snapshots_usage = snapshot_usage[1:]
|
||||
return common_snapshot_usage_info, snapshots_usage
|
||||
|
||||
Reference in New Issue
Block a user