Funcionalidade Adicional: Autenticação por Resource Principal e Vault.

This commit is contained in:
2024-09-02 08:41:31 -03:00
parent 270487ddb5
commit 32aa6cd977
3 changed files with 316 additions and 2 deletions

View File

@@ -173,10 +173,19 @@ with
![img_3.png](images/resourceprincipal-4.png)
This is the code to change:
signer = oci.auth.signers.get_resource_principals_signer()
logging = oci.loggingingestion.LoggingClient(config={}, signer=signer)
See the [authRPApi.py](./files/authApi/authRPApi.py) code with the changes from **OCI Private Key** and **config** files authorization to **OCI Resource Principal** authorization. Remember to rename the **authRPApi.py** to **func.py** and build your function to test.
## Vault Secret
Another way to not expose sensitive data is using **OCI Vault**.
You can configure a **Vault** to store your sensitive data like passwords, endpoints, etc.
You can configure a [Vault](https://www.ateam-oracle.com/post/using-the-oci-instance-principals-and-vault-with-python-to-retrieve-a-secret) to store your sensitive data like passwords, endpoints, etc.
![img_4.png](images/resourceprincipal-5.png)
@@ -186,6 +195,18 @@ You can create a **Vault** and the secrets for use in your function code:
Now, you can specify the **Secret OCID** to obtain the secret. The code are protected by **Resource Principal**.
Declare the initialization for your secret client:
secret_client = oci.secrets.SecretsClient(config={}, signer=signer)
Then you can obtain the secret value specifying your secret **OCID**:
ClientId = read_secret_value(secret_client, "ocid1.vaultsecret.oc1.iad.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
ClientSecret = read_secret_value(secret_client, "ocid1.vaultsecret.oc1.iad.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
See the [authRPApi.py](./files/authApi/authRPApi.py) code with the changes to obtain your secrets. Remember to rename the **authRPApi.py** to **func.py** and build your function to test.
## applyValidationApi
The validation from OpenAPI spec is not possible today. The OCI API Gateway can implement validations with Authorization function. In the process of deploying the API from the OpenAPI spec, we can get the definitions and save it as a HEADER transformation, OCI API Gateway can do it for us, but cannot be used inside the authorization function because the HEADER transformation cannot be loaded in the function runtime execution.
@@ -296,4 +317,5 @@ To create an automation to:
- [Adding Context Variables to Policies and HTTP Back End Definitions](https://docs.oracle.com/en-us/iaas/Content/APIGateway/Tasks/apigatewaycontextvariables.htm)
- [IDCS API Rate Limits](https://docs.oracle.com/en/cloud/paas/identity-cloud/uaids/oracle-identity-cloud-service-pricing-models.html#GUID-C1505A67-9C21-484A-8395-04C4253FA1CD)
- [Create Policies to Control Access to Network and API Gateway-Related Resources](https://docs.oracle.com/en-us/iaas/Content/APIGateway/Tasks/apigatewaycreatingpolicies.htm)
- [SDK Authentication Methods](https://docs.oracle.com/en-us/iaas/Content/API/Concepts/sdk_authentication_methods.htm)
- [SDK Authentication Methods](https://docs.oracle.com/en-us/iaas/Content/API/Concepts/sdk_authentication_methods.htm)
- [Using the OCI Instance Principals and Vault with Python to retrieve a Secret](https://www.ateam-oracle.com/post/using-the-oci-instance-principals-and-vault-with-python-to-retrieve-a-secret)

292
files/authApi/authRPApi.py Normal file
View File

@@ -0,0 +1,292 @@
import base64
import json
import io
from fdk import response
import oci
import requests
import time
from openapi_schema_validator import validate
import os
import ast
from bravado_core.spec import Spec
from bravado_core.validate import validate_object
#### IDCS Routines
#### https://docs.oracle.com/en/learn/apigw-modeldeployment/index.html#introduction
#### https://docs.oracle.com/en/learn/migrate-api-to-api-gateway/#introduction
def auth_idcs(token, url, clientID, secretID):
url = url + "/oauth2/v1/introspect"
auth = clientID + ":" + secretID
auth_bytes = auth.encode("ascii")
auth_base64_bytes = base64.b64encode(auth_bytes)
auth_base64_message = auth_base64_bytes.decode("ascii")
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Authorization': 'Basic ' + auth_base64_message
}
payload = "token=" + token
response = requests.request("POST", url, headers=headers, data=payload)
return response
def beautify_str(str_msg):
msg = str(str_msg.encode('unicode_escape').decode("utf-8")).replace("\\n", " ")
split_str = msg.split()
return " ".join(split_str)
###
def replace_regex(variavel):
variavel = variavel.replace("\\d", "[0-9]")
variavel = variavel.replace("\\D", "[^0-9]")
variavel = variavel.replace("\\.", "[.]")
variavel = variavel.replace("\\w", "[a-zA-Z0-9_]")
variavel = variavel.replace("\\W", "[^a-zA-Z0-9_]")
variavel = variavel.replace("/^", "^")
variavel = variavel.replace("$/", "$")
return variavel
def replace_escape_chars(obj):
for k, v in obj.items():
if isinstance(v, str):
obj[k] = replace_regex(v)
elif isinstance(v, dict):
obj[k] = replace_escape_chars(v)
elif isinstance(v, list):
for i in range(len(v)):
if isinstance(v[i], str):
v[i] = replace_regex(v[i])
elif isinstance(v[i], dict):
v[i] = replace_escape_chars(v[i])
return obj
def remove_property(dictionary, property_name):
keys_to_delete = [key for key in dictionary if key == property_name]
for key in keys_to_delete:
if ("\\s" in dictionary[key] or "\\S" in dictionary[key] or "\\w" in dictionary[key] or "\\W" in dictionary[key]
or "\\b" in dictionary[key] or "\\B" in dictionary[key] or "\\A" in dictionary[key] or "\\Z" in dictionary[key]):
del dictionary[key]
else:
dictionary[key] = replace_regex(dictionary[key])
for value in dictionary.values():
if isinstance(value, dict):
remove_property(value, property_name)
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
remove_property(item, property_name)
return dictionary
def read_secret_value(secret_client, secret_id):
response = secret_client.get_secret_bundle(secret_id)
base64_Secret_content = response.data.secret_bundle_content.content
base64_secret_bytes = base64_Secret_content.encode('ascii')
base64_message_bytes = base64.b64decode(base64_secret_bytes)
secret_content = base64_message_bytes.decode('ascii')
return secret_content
def handler(ctx, data: io.BytesIO = None):
signer = oci.auth.signers.get_resource_principals_signer()
logging = oci.loggingingestion.LoggingClient(config={}, signer=signer)
secret_client = oci.secrets.SecretsClient(config={}, signer=signer)
# functions context variables
app_context = dict(ctx.Config())
jsonData = ""
try:
header = json.loads(data.getvalue().decode('utf-8'))["data"]
# IDCS Validation
url = "https://idcs-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.identity.oraclecloud.com"
ClientId = read_secret_value(secret_client, "ocid1.vaultsecret.oc1.iad.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
ClientSecret = read_secret_value(secret_client, "ocid1.vaultsecret.oc1.iad.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
try:
body = dict(json.loads(data.getvalue().decode('utf-8')).get("data"))["body"]
body = json.loads(body)
except:
body = None
# body content
body_schema_validation = None
try:
if (".apigatewayapi." not in header["body_schema_validation"]):
body_schema_validation = ast.literal_eval(header["body_schema_validation"])
else:
body_schema_validation = header["body_schema_validation"]
except:
body_schema_validation = None
# header values
access_token = header["token"]
authorization = auth_idcs(access_token, url, ClientId, ClientSecret)
try:
if (authorization.json().get("active") != True):
return response.Response(
ctx,
status_code=401,
response_data=json.dumps({"active": False, "wwwAuthenticate": jsonData})
)
except(Exception) as ex1:
jsonData = 'error parsing json payload: ' + str(ex1)
put_logs_response = logging.put_logs(
log_id="ocid1.log.oc1.iad.amaaaaaanamaaaaaanamaaaaaanamaaaaaanamaaaaaanamaaaaaan",
put_logs_details=oci.loggingingestion.models.PutLogsDetails(
specversion="EXAMPLE-specversion-Value",
log_entry_batches=[
oci.loggingingestion.models.LogEntryBatch(
entries=[
oci.loggingingestion.models.LogEntry(
data="error(a): " + jsonData,
id="ocid1.test.oc1..00000001.EXAMPLE-id-Value")],
source="EXAMPLE-source-Value",
type="EXAMPLE-type-Value")]))
rdata = json.dumps({
"active": False,
"context": {
"status_code": 401,
"message": "Unauthorized",
"body": body,
"body_schema_validation": json.dumps(body_schema_validation),
"error": str(ex1)
}})
return response.Response(
ctx,
status_code=401,
response_data=rdata
)
rdata = json.dumps({
"active": True,
"context": {
"body": body,
"body_schema_validation": json.dumps(body_schema_validation)
}})
# Validate API spec
if (body_schema_validation != None):
if (".apigatewayapi." not in header["body_schema_validation"]):
# Com validacao direto por propriedades (sem schemas e referencias)
try:
validate(body, body_schema_validation["schema"])
return response.Response(
ctx, response_data=rdata,
status_code=200,
headers={"Content-Type": "application/json", "body": json.dumps(body)}
)
except(Exception) as ex2:
error_msg = beautify_str(str(ex2))
put_logs_response = logging.put_logs(
log_id="ocid1.log.oc1.iad.amaaaaaanamaaaaaanamaaaaaanamaaaaaanamaaaaaanamaaaaaan",
put_logs_details=oci.loggingingestion.models.PutLogsDetails(
specversion="EXAMPLE-specversion-Value",
log_entry_batches=[
oci.loggingingestion.models.LogEntryBatch(
entries=[
oci.loggingingestion.models.LogEntry(
data="error(b): " + error_msg,
id="ocid1.test.oc1..00000001.EXAMPLE-id-Value")],
source="EXAMPLE-source-Value",
type="EXAMPLE-type-Value")]))
rdata = json.dumps({
"active": False,
"context": {
"status_code": 401,
"message": "Unauthorized",
"body": body,
"body_schema_validation": json.dumps(body_schema_validation),
"error": error_msg
}})
return response.Response(
ctx,
status_code=401,
response_data=rdata
)
else:
# Com schema de validação - Tanto swagger como Open API 3
try:
bravado_config = {
'validate_swagger_spec': False,
'validate_requests': False,
'validate_responses': False,
'use_models': True,
}
contents = body_schema_validation.split(",")
apigateway_client = oci.apigateway.ApiGatewayClient(config)
api_spec = apigateway_client.get_api_content(contents[1])
spec_dict = json.loads(api_spec.data.content)
spec_dict = remove_property(spec_dict, "pattern")
spec = Spec.from_dict(spec_dict, config=bravado_config)
try:
schema = spec_dict["definitions"][contents[0]]
except:
schema = spec_dict["components"]["schemas"][contents[0]]
schema_without_pattern = remove_property(schema, "pattern")
validate_object(spec, schema_without_pattern, body)
except (Exception) as ex3:
error_msg = beautify_str(str(ex3))
put_logs_response = logging.put_logs(
log_id="ocid1.log.oc1.iad.amaaaaaanamaaaaaanamaaaaaanamaaaaaanamaaaaaanamaaaaaan",
put_logs_details=oci.loggingingestion.models.PutLogsDetails(
specversion="EXAMPLE-specversion-Value",
log_entry_batches=[
oci.loggingingestion.models.LogEntryBatch(
entries=[
oci.loggingingestion.models.LogEntry(
data="error(b): " + error_msg,
id="ocid1.test.oc1..00000001.EXAMPLE-id-Value")],
source="EXAMPLE-source-Value",
type="EXAMPLE-type-Value")]))
rdata = json.dumps({
"active": False,
"context": {
"status_code": 401,
"message": "Unauthorized",
"body": body,
"body_schema_validation": json.dumps(body_schema_validation),
"error": error_msg
}})
return response.Response(
ctx,
status_code=401,
response_data=rdata
)
return response.Response(
ctx, response_data=rdata,
status_code=200,
headers={"Content-Type": "application/json", "body_schema_validation": body_schema_validation, "body": json.dumps(body)}
)
except(Exception) as ex:
jsonData = 'error parsing json payload: ' + str(ex)
put_logs_response = logging.put_logs(
log_id="ocid1.log.oc1.iad.amaaaaaanamaaaaaanamaaaaaanamaaaaaanamaaaaaanamaaaaaan",
put_logs_details=oci.loggingingestion.models.PutLogsDetails(
specversion="EXAMPLE-specversion-Value",
log_entry_batches=[
oci.loggingingestion.models.LogEntryBatch(
entries=[
oci.loggingingestion.models.LogEntry(
data="error(c): " + jsonData,
id="ocid1.test.oc1..00000001.EXAMPLE-id-Value")],
source="EXAMPLE-source-Value",
type="EXAMPLE-type-Value")]))
pass
return response.Response(
ctx,
status_code=401,
response_data=json.dumps({"active": False, "wwwAuthenticate": jsonData})
)