adjustments in redaction code

This commit is contained in:
2024-09-10 17:15:03 -03:00
parent c2385c6cda
commit a9bbb8aba3
9 changed files with 329 additions and 34 deletions

View File

@@ -10,6 +10,43 @@ import os
import ast
from bravado_core.spec import Spec
from bravado_core.validate import validate_object
from datetime import datetime
from random import randrange
import Redaction
SENSITIVE_PATTERNS = [
r"\d{3}-\d{2}-\d{4}", # Social Security Number (SSN) pattern
r"\d{4}[-\s]\d{4}[-\s]\d{4}[-\s]\d{4}", # Credit card number pattern
r"\(?\d{3}\)?[-\s.]?\d{3}[-\s.]?\d{4}", # Phone number
r"(0[1-9]|1[0-2])[-/.](0[1-9]|[12][0-9]|3[01])[-/.](19|20)\d\d", # date of birth
r"((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)", # IP address
r"[a-zA-Z0-9]{32}", # API key
r"^(\d{5}.\d{2}-\d)|(\d{8})$"
]
ATTRIBUTE_PATTERNS = [
"documentNumber",
"documentCustodyAgentAccountCode",
"isinCode",
"payingAgentAccountCode",
"registrationParticipantAccountCode",
"nome",
"$ref",
"cpf",
"teste",
"valor",
"original",
"type",
"solicitacaoPagador",
"expiracao",
"chave",
"description",
"items",
"required",
"x-scope",
"maxLength"
]
#### IDCS Routines
#### https://docs.oracle.com/en/learn/apigw-modeldeployment/index.html#introduction
@@ -51,20 +88,6 @@ def replace_regex(variavel):
return variavel
def replace_escape_chars(obj):
for k, v in obj.items():
if isinstance(v, str):
obj[k] = replace_regex(v)
elif isinstance(v, dict):
obj[k] = replace_escape_chars(v)
elif isinstance(v, list):
for i in range(len(v)):
if isinstance(v[i], str):
v[i] = replace_regex(v[i])
elif isinstance(v[i], dict):
v[i] = replace_escape_chars(v[i])
return obj
def remove_property(dictionary, property_name):
keys_to_delete = [key for key in dictionary if key == property_name]
for key in keys_to_delete:
@@ -82,6 +105,14 @@ def remove_property(dictionary, property_name):
remove_property(item, property_name)
return dictionary
def count_attributes(json_data):
count = 0
for key, value in json_data.items():
count += 1
if isinstance(value, dict):
count += count_attributes(value)
return count
def read_secret_value(secret_client, secret_id):
response = secret_client.get_secret_bundle(secret_id)
base64_Secret_content = response.data.secret_bundle_content.content
@@ -91,9 +122,8 @@ def read_secret_value(secret_client, secret_id):
return secret_content
def handler(ctx, data: io.BytesIO = None):
signer = oci.auth.signers.get_resource_principals_signer()
logging = oci.loggingingestion.LoggingClient(config={}, signer=signer)
secret_client = oci.secrets.SecretsClient(config={}, signer=signer)
config = oci.config.from_file("config")
logging = oci.loggingingestion.LoggingClient(config)
# functions context variables
app_context = dict(ctx.Config())
@@ -108,6 +138,25 @@ def handler(ctx, data: io.BytesIO = None):
ClientId = read_secret_value(secret_client, "ocid1.vaultsecret.oc1.iad.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
ClientSecret = read_secret_value(secret_client, "ocid1.vaultsecret.oc1.iad.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
# JSON Items counter
jsonData = dict(json.loads(data.getvalue().decode('utf-8')).get("data"))["body"]
jsonData = dict(json.loads(jsonData))
c = count_attributes(jsonData)
if (c > 21):
rdata = json.dumps({
"active": False,
"context": {
"status_code": 401,
"message": "JSON exception",
"error": "JSON exception",
}})
return response.Response(
ctx,
status_code=401,
response_data=rdata
)
try:
body = dict(json.loads(data.getvalue().decode('utf-8')).get("data"))["body"]
body = json.loads(body)
@@ -184,6 +233,8 @@ def handler(ctx, data: io.BytesIO = None):
)
except(Exception) as ex2:
error_msg = beautify_str(str(ex2))
redaction = Redaction.Redaction()
error_msg = redaction.redact(sensitive_pattern=SENSITIVE_PATTERNS, attribute_pattern=ATTRIBUTE_PATTERNS, message=error_msg)
put_logs_response = logging.put_logs(
log_id="ocid1.log.oc1.iad.amaaaaaanamaaaaaanamaaaaaanamaaaaaanamaaaaaanamaaaaaan",
put_logs_details=oci.loggingingestion.models.PutLogsDetails(
@@ -225,15 +276,20 @@ def handler(ctx, data: io.BytesIO = None):
api_spec = apigateway_client.get_api_content(contents[1])
spec_dict = json.loads(api_spec.data.content)
spec_dict = remove_property(spec_dict, "pattern")
spec = Spec.from_dict(spec_dict, config=bravado_config)
try:
schema = spec_dict["definitions"][contents[0]]
except:
schema = spec_dict["components"]["schemas"][contents[0]]
schema_without_pattern = remove_property(schema, "pattern")
validate_object(spec, schema_without_pattern, body)
except (Exception) as ex3:
error_msg = beautify_str(str(ex3))
redaction = Redaction.Redaction()
error_msg = redaction.redact(sensitive_pattern=SENSITIVE_PATTERNS, attribute_pattern=ATTRIBUTE_PATTERNS, message=error_msg)
put_logs_response = logging.put_logs(
log_id="ocid1.log.oc1.iad.amaaaaaanamaaaaaanamaaaaaanamaaaaaanamaaaaaanamaaaaaan",
put_logs_details=oci.loggingingestion.models.PutLogsDetails(