Funcionalidade Nova: implementação das validações nativas de header e query parameter no API Gateway. Não faz a validação de tipos, apenas as obrigatoriedades

This commit is contained in:
2024-06-27 01:48:43 -03:00
parent 706454f2ba
commit 1871cad962
2 changed files with 85 additions and 18 deletions

View File

@@ -8,6 +8,7 @@ import time
from itertools import groupby
import yaml
import datetime
import ast
#### IDCS Routines
#### https://docs.oracle.com/en/learn/apigw-modeldeployment/index.html#introduction
@@ -184,7 +185,35 @@ def applyAuthApi(compartmentId, displayName, payload, functionId, host, api_gate
methods = [item["METHOD"]]
path_prefix = item["PATH_PREFIX"]
callback_url = ("https://" + host + item["PATH_PREFIX"] + "validation-callback" + item["PATH"]).replace("{", "${request.path[").replace("}", "]}")
item_policy = []
item_query = []
item_header = []
if (item["SCHEMA_BODY_VALIDATION"] != ""):
item_policy.append(oci.apigateway.models.SetHeaderPolicyItem(
name="body_schema_validation",
values=[item["SCHEMA_BODY_VALIDATION"]],
if_exists="APPEND"))
if (item["SCHEMA_QUERY_VALIDATION"] != ""):
item_policy.append(oci.apigateway.models.SetHeaderPolicyItem(
name="query_schema_validation",
values=[item["SCHEMA_QUERY_VALIDATION"]],
if_exists="APPEND"))
try:
for items in ast.literal_eval(item["SCHEMA_QUERY_VALIDATION"]):
if (items["in"] == "query"):
item_query.append(oci.apigateway.models.QueryParameterValidationItem(
name=items["name"],
required=items["required"]
))
if (items["in"] == "header"):
item_header.append(oci.apigateway.models.HeaderValidationItem(
name=items["name"],
required=items["required"]
))
except:
print("NO")
if (item["SCHEMA_BODY_VALIDATION"] != "" or item["SCHEMA_QUERY_VALIDATION"] != ""):
put_logs_response = logging.put_logs(
log_id="ocid1.log.oc1.iad.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
put_logs_details=oci.loggingingestion.models.PutLogsDetails(
@@ -197,6 +226,23 @@ def applyAuthApi(compartmentId, displayName, payload, functionId, host, api_gate
id="ocid1.test.oc1..00000001.EXAMPLE-id-Value")],
source="EXAMPLE-source-Value",
type="EXAMPLE-type-Value")]))
header_transformation = None
query_parameter_validation = None
header_validation = None
if (len(item_policy) >0):
header_transformation=oci.apigateway.models.HeaderTransformationPolicy(
set_headers=oci.apigateway.models.SetHeaderPolicy(
items=item_policy))
if (len(item_query) > 0):
query_parameter_validation=oci.apigateway.models.QueryParameterValidationRequestPolicy(
parameters=item_query
)
if (len(item_header) > 0):
header_validation=oci.apigateway.models.HeaderValidationRequestPolicy(
headers=item_header
)
routes.append(
oci.apigateway.models.ApiSpecificationRoute(
path=item["PATH"],
@@ -206,14 +252,9 @@ def applyAuthApi(compartmentId, displayName, payload, functionId, host, api_gate
is_ssl_verify_disabled=False),
methods=methods,
request_policies=oci.apigateway.models.ApiSpecificationRouteRequestPolicies(
header_transformations=oci.apigateway.models.HeaderTransformationPolicy(
set_headers=oci.apigateway.models.SetHeaderPolicy(
items=[
oci.apigateway.models.SetHeaderPolicyItem(
name="body_schema_validation",
values=[item["SCHEMA_BODY_VALIDATION"]],
if_exists="APPEND")]),
)
header_transformations=header_transformation,
query_parameter_validations=query_parameter_validation,
header_validations=header_validation
)))
new_routes.append(
oci.apigateway.models.ApiSpecificationRoute(
@@ -224,17 +265,11 @@ def applyAuthApi(compartmentId, displayName, payload, functionId, host, api_gate
is_ssl_verify_disabled=False),
methods=methods,
request_policies=oci.apigateway.models.ApiSpecificationRouteRequestPolicies(
header_transformations=oci.apigateway.models.HeaderTransformationPolicy(
set_headers=oci.apigateway.models.SetHeaderPolicy(
items=[
oci.apigateway.models.SetHeaderPolicyItem(
name="body_schema_validation",
values=[item["SCHEMA_BODY_VALIDATION"]],
if_exists="APPEND")]),
)
header_transformations=header_transformation,
query_parameter_validations=query_parameter_validation,
header_validations=header_validation
)
))
else:
routes.append(
oci.apigateway.models.ApiSpecificationRoute(
@@ -268,6 +303,7 @@ def applyAuthApi(compartmentId, displayName, payload, functionId, host, api_gate
'token': 'request.headers[token]',
'body': 'request.body',
'body_schema_validation': 'request.headers[body_schema_validation]',
'query_schema_validation': 'request.headers[query_schema_validation]',
'opc-request-id': 'request.headers[opc-request-id]'},
cache_key=["token", "opc-request-id"],
validation_failure_policy=oci.apigateway.models.ModifyResponseValidationFailurePolicy(
@@ -293,6 +329,7 @@ def applyAuthApi(compartmentId, displayName, payload, functionId, host, api_gate
'token': 'request.headers[token]',
'body': 'request.body',
'body_schema_validation': 'request.headers[body_schema_validation]',
'query_schema_validation': 'request.headers[query_schema_validation]',
'opc-request-id': 'request.headers[opc-request-id]'},
cache_key=["token", "opc-request-id"],
validation_failure_policy=oci.apigateway.models.ModifyResponseValidationFailurePolicy(
@@ -355,6 +392,7 @@ def verify_path(json_data_list):
'PATH': item2["PATH_PREFIX"],
'ENDPOINT': item2["ENDPOINT"],
'SCHEMA_BODY_VALIDATION': item2["SCHEMA_BODY_VALIDATION"],
'SCHEMA_QUERY_VALIDATION': item2["SCHEMA_QUERY_VALIDATION"],
'CONTENT_TYPE': item2["CONTENT_TYPE"]
})
else:
@@ -367,6 +405,7 @@ def verify_path(json_data_list):
'PATH': item2["PATH"],
'ENDPOINT': item2["ENDPOINT"],
'SCHEMA_BODY_VALIDATION': item2["SCHEMA_BODY_VALIDATION"],
'SCHEMA_QUERY_VALIDATION': item2["SCHEMA_QUERY_VALIDATION"],
'CONTENT_TYPE': item2["CONTENT_TYPE"]
})
@@ -462,6 +501,31 @@ def process_api_spec(api_id, compartmentId, environment, swagger, functionId, ho
except:
SCHEMA_BODY_VALIDATION = ""
CONTENT_TYPE = ""
# 2024-06-26 - Query Parameter
if (version == "3"):
try:
try:
reference = str(fullSpec["paths"][spec["path"]][str(spec["methods"][0]).lower()]["parameters"])
SCHEMA_QUERY_VALIDATION = reference #+ "," + api_id
except:
reference = str(fullSpec["paths"][spec["path"]][str(spec["methods"][0]).lower()]["parameters"])
SCHEMA_QUERY_VALIDATION = reference
CONTENT_TYPE = "application/json"
except:
SCHEMA_QUERY_VALIDATION = ""
CONTENT_TYPE = ""
else:
SCHEMA_QUERY_VALIDATION = ""
CONTENT_TYPE = ""
try:
reference = str(fullSpec["paths"][spec["path"]][str(spec["methods"][0]).lower()]["parameters"])
SCHEMA_QUERY_VALIDATION = reference #+ "," + api_id
CONTENT_TYPE = "application/json"
except:
SCHEMA_QUERY_VALIDATION = ""
CONTENT_TYPE = ""
TYPE = type
ENVIRONMENT = environment
json_data_list.append({
@@ -473,9 +537,11 @@ def process_api_spec(api_id, compartmentId, environment, swagger, functionId, ho
'PATH': PATH,
'ENDPOINT': ENDPOINT,
'SCHEMA_BODY_VALIDATION': SCHEMA_BODY_VALIDATION,
'SCHEMA_QUERY_VALIDATION': SCHEMA_QUERY_VALIDATION,
'CONTENT_TYPE': CONTENT_TYPE
})
print(API_NAME, TYPE, ENVIRONMENT, METHOD, PATH_PREFIX, PATH, ENDPOINT, SCHEMA_BODY_VALIDATION, CONTENT_TYPE)
print(API_NAME, TYPE, ENVIRONMENT, METHOD, PATH_PREFIX, PATH, ENDPOINT, SCHEMA_BODY_VALIDATION, SCHEMA_QUERY_VALIDATION, CONTENT_TYPE)
put_logs_response = logging.put_logs(
log_id="ocid1.log.oc1.iad.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
put_logs_details=oci.loggingingestion.models.PutLogsDetails(
@@ -493,6 +559,7 @@ def process_api_spec(api_id, compartmentId, environment, swagger, functionId, ho
'PATH': PATH,
'ENDPOINT': ENDPOINT,
'SCHEMA_BODY_VALIDATION': SCHEMA_BODY_VALIDATION,
'SCHEMA_QUERY_VALIDATION': SCHEMA_QUERY_VALIDATION,
'CONTENT_TYPE': CONTENT_TYPE
}),
id="ocid1.test.oc1..00000001.EXAMPLE-id-Value")],