mirror of
https://github.com/hoshikawa2/fn-apigw-auth-draft-cavage-http-signatures-08.git
synced 2026-03-06 02:10:39 +00:00
170 lines
7.2 KiB
Python
170 lines
7.2 KiB
Python
from cryptography.hazmat.primitives import hashes
|
|
from cryptography.hazmat.primitives.asymmetric import padding
|
|
from cryptography.hazmat.primitives.serialization import load_pem_private_key
|
|
import base64
|
|
import json
|
|
from datetime import datetime
|
|
import io
|
|
from fdk import response
|
|
import oci
|
|
import requests
|
|
|
|
def get_date():
|
|
d = str(datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT'))
|
|
return d
|
|
|
|
def get_signing(d, streaming_host, oci_region):
|
|
with open('oci_api_key.pem', 'rb') as key_file:
|
|
private_key = load_pem_private_key(key_file.read(), password=None)# Dados para assinar
|
|
str = b'(request-target): post /20180418/streams/<streaming_host>/groupCursors\ndate: <date_str>\nhost: streaming.<oci_region>.oci.oraclecloud.com'# Assine os dados usando SHA-256 e a chave privada
|
|
|
|
data = str.replace(b'<date_str>', bytes(d.encode())).replace(b'<streaming_host>', bytes(streaming_host.encode())).replace(b'<oci_region>', bytes(oci_region.encode()))
|
|
signature = private_key.sign(data, padding.PKCS1v15(), hashes.SHA256())# Imprima a assinatura
|
|
|
|
base64_encoded = base64.b64encode(signature)
|
|
|
|
return base64_encoded
|
|
|
|
def get_authorization(d, streaming_host, oci_region, tenancy, user, fingerprint):
|
|
a = get_signing(d, streaming_host, oci_region)
|
|
s = b'Signature algorithm="rsa-sha256",headers="(request-target) date host",keyId="<tenancy>/<user>/<fingerprint>",signature="<signature>",version="1"'
|
|
s = s.replace(b'<signature>', a).replace(b'<tenancy>', bytes(tenancy.encode())).replace(b'<user>', bytes(user.encode())).replace(b'<fingerprint>', bytes(fingerprint.encode()))
|
|
r = s.decode()
|
|
return r
|
|
|
|
def auth_idcs(token, url, clientID, secretID):
|
|
url = url + "/oauth2/v1/introspect"
|
|
|
|
auth = clientID + ":" + secretID
|
|
auth_bytes = auth.encode("ascii")
|
|
auth_base64_bytes = base64.b64encode(auth_bytes)
|
|
auth_base64_message = auth_base64_bytes.decode("ascii")
|
|
|
|
headers = {
|
|
'Content-Type': 'application/x-www-form-urlencoded',
|
|
'Authorization': 'Basic ' + auth_base64_message
|
|
}
|
|
|
|
payload = "token=" + token
|
|
|
|
response = requests.request("POST", url, headers=headers, data=payload)
|
|
return response
|
|
|
|
#Function used to load the configurations from the config.json file
|
|
def getOptions():
|
|
fo = open("config.json", "r")
|
|
config = fo.read()
|
|
options = json.loads(config)
|
|
return options
|
|
|
|
def handler(ctx, data: io.BytesIO = None):
|
|
config = oci.config.from_file("config")
|
|
logging = oci.loggingingestion.LoggingClient(config)
|
|
tenancy = config['tenancy']
|
|
user = config['user']
|
|
fingerprint = config['fingerprint']
|
|
|
|
app_context = dict(ctx.Config())
|
|
streaming_host = app_context['streaming_host']
|
|
oci_region = app_context['oci_region']
|
|
jsonData = ""
|
|
|
|
options = getOptions()
|
|
|
|
try:
|
|
header = json.loads(data.getvalue().decode('utf-8'))["data"]
|
|
access_token = header["token"]
|
|
url = options["BaseUrl"]
|
|
|
|
authorization = auth_idcs(access_token, url, options["ClientId"], options["ClientSecret"])
|
|
if authorization.json().get("active") == False:
|
|
return response.Response(
|
|
ctx,
|
|
status_code=401,
|
|
response_data=json.dumps({"active": False, "wwwAuthenticate": jsonData})
|
|
)
|
|
|
|
d = get_date()
|
|
a = get_authorization(d, streaming_host=streaming_host, oci_region=oci_region, tenancy=tenancy, user=user, fingerprint=fingerprint)
|
|
|
|
rdata = json.dumps({
|
|
"active": True,
|
|
"context": {
|
|
"date": d,
|
|
"authorization": a,
|
|
"streaming_host": streaming_host,
|
|
"oci_region": oci_region,
|
|
"tenancy": tenancy,
|
|
"user": user,
|
|
"fingerprint": fingerprint
|
|
}})
|
|
|
|
put_logs_response = logging.put_logs(
|
|
log_id="ocid1.log.oc1.iad.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
|
|
put_logs_details=oci.loggingingestion.models.PutLogsDetails(
|
|
specversion="EXAMPLE-specversion-Value",
|
|
log_entry_batches=[
|
|
oci.loggingingestion.models.LogEntryBatch(
|
|
entries=[
|
|
oci.loggingingestion.models.LogEntry(
|
|
data="authorization: " + str(a),
|
|
id="ocid1.test.oc1..00000001.EXAMPLE-id-Value")],
|
|
source="EXAMPLE-source-Value",
|
|
type="EXAMPLE-type-Value")]))
|
|
|
|
put_logs_response = logging.put_logs(
|
|
log_id="ocid1.log.oc1.iad.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
|
|
put_logs_details=oci.loggingingestion.models.PutLogsDetails(
|
|
specversion="EXAMPLE-specversion-Value",
|
|
log_entry_batches=[
|
|
oci.loggingingestion.models.LogEntryBatch(
|
|
entries=[
|
|
oci.loggingingestion.models.LogEntry(
|
|
data="request payload: " + json.dumps(header),
|
|
id="ocid1.test.oc1..00000001.EXAMPLE-id-Value-1")],
|
|
source="EXAMPLE-source-Value",
|
|
type="EXAMPLE-type-Value")]))
|
|
|
|
put_logs_response = logging.put_logs(
|
|
log_id="ocid1.log.oc1.iad.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
|
|
put_logs_details=oci.loggingingestion.models.PutLogsDetails(
|
|
specversion="EXAMPLE-specversion-Value",
|
|
log_entry_batches=[
|
|
oci.loggingingestion.models.LogEntryBatch(
|
|
entries=[
|
|
oci.loggingingestion.models.LogEntry(
|
|
data="access: " + json.dumps(authorization.text),
|
|
id="ocid1.test.oc1..00000001.EXAMPLE-id-Value-1")],
|
|
source="EXAMPLE-source-Value",
|
|
type="EXAMPLE-type-Value")]))
|
|
|
|
|
|
return response.Response(
|
|
ctx, response_data=rdata,
|
|
status_code=200,
|
|
headers={"Content-Type": "application/json", "Authorization": a, "Date": d}
|
|
)
|
|
|
|
except(Exception) as ex:
|
|
jsonData = 'error parsing json payload: ' + str(ex)
|
|
put_logs_response = logging.put_logs(
|
|
log_id="ocid1.log.oc1.iad.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
|
|
put_logs_details=oci.loggingingestion.models.PutLogsDetails(
|
|
specversion="EXAMPLE-specversion-Value",
|
|
log_entry_batches=[
|
|
oci.loggingingestion.models.LogEntryBatch(
|
|
entries=[
|
|
oci.loggingingestion.models.LogEntry(
|
|
data="error: " + jsonData,
|
|
id="ocid1.test.oc1..00000001.EXAMPLE-id-Value")],
|
|
source="EXAMPLE-source-Value",
|
|
type="EXAMPLE-type-Value")]))
|
|
|
|
pass
|
|
|
|
return response.Response(
|
|
ctx,
|
|
status_code=401,
|
|
response_data=json.dumps({"active": False, "wwwAuthenticate": jsonData})
|
|
)
|