Use dataclass for login data

This commit is contained in:
Andre Basche 2023-04-15 14:37:27 +02:00
parent 4a0ee8569b
commit b6ca12ebff

View file

@ -3,8 +3,10 @@ import logging
import re import re
import secrets import secrets
import urllib import urllib
from dataclasses import dataclass
from datetime import datetime, timedelta from datetime import datetime, timedelta
from pprint import pformat from pprint import pformat
from typing import Dict, Optional
from urllib import parse from urllib import parse
from urllib.parse import quote from urllib.parse import quote
@ -17,6 +19,15 @@ from pyhon.connection.handler.auth import HonAuthConnectionHandler
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@dataclass
class HonLoginData:
url: str = ""
email: str = ""
password: str = ""
fw_uid: str = ""
loaded: Optional[Dict] = None
class HonAuth: class HonAuth:
_TOKEN_EXPIRES_AFTER_HOURS = 8 _TOKEN_EXPIRES_AFTER_HOURS = 8
_TOKEN_EXPIRE_WARNING_HOURS = 7 _TOKEN_EXPIRE_WARNING_HOURS = 7
@ -24,8 +35,9 @@ class HonAuth:
def __init__(self, session, email, password, device) -> None: def __init__(self, session, email, password, device) -> None:
self._session = session self._session = session
self._request = HonAuthConnectionHandler(session) self._request = HonAuthConnectionHandler(session)
self._email = email self._login_data = HonLoginData()
self._password = password self._login_data.email = email
self._login_data.password = password
self._access_token = "" self._access_token = ""
self._refresh_token = "" self._refresh_token = ""
self._cognito_token = "" self._cognito_token = ""
@ -77,7 +89,7 @@ class HonAuth:
async def _load_login(self): async def _load_login(self):
login_url = await self._introduce() login_url = await self._introduce()
login_url = await self._handle_redirects(login_url) login_url = await self._handle_redirects(login_url)
return await self._login_url(login_url) await self._login_url(login_url)
async def _introduce(self) -> str: async def _introduce(self) -> str:
redirect_uri = urllib.parse.quote(f"{const.APP}://mobilesdk/detect/oauth/done") redirect_uri = urllib.parse.quote(f"{const.APP}://mobilesdk/detect/oauth/done")
@ -112,46 +124,47 @@ class HonAuth:
redirect2 = await self._manual_redirect(redirect1) redirect2 = await self._manual_redirect(redirect1)
return f"{redirect2}&System=IoT_Mobile_App&RegistrationSubChannel=hOn" return f"{redirect2}&System=IoT_Mobile_App&RegistrationSubChannel=hOn"
async def _login_url(self, login_url: str) -> str: async def _login_url(self, login_url: str) -> bool:
headers = {"user-agent": const.USER_AGENT} headers = {"user-agent": const.USER_AGENT}
url = URL(login_url, encoded=True) url = URL(login_url, encoded=True)
async with self._request.get(url, headers=headers) as response: async with self._request.get(url, headers=headers) as response:
text = await response.text() text = await response.text()
if context := re.findall('"fwuid":"(.*?)","loaded":(\\{.*?})', text): if context := re.findall('"fwuid":"(.*?)","loaded":(\\{.*?})', text):
fw_uid, loaded_str = context[0] fw_uid, loaded_str = context[0]
loaded = json.loads(loaded_str) self._login_data.fw_uid = fw_uid
result = login_url.replace("/".join(const.AUTH_API.split("/")[:-1]), "") self._login_data.loaded = json.loads(loaded_str)
return fw_uid, loaded, result self._login_data.url = login_url.replace(
"/".join(const.AUTH_API.split("/")[:-1]), ""
)
return True
await self._error_logger(response) await self._error_logger(response)
async def _login(self, fw_uid, loaded, login_url): async def _login(self):
data = { start_url = parse.unquote(self._login_data.url.split("startURL=")[-1]).split(
"message": { "%3D"
"actions": [ )[0]
{ action = {
"id": "79;a", "id": "79;a",
"descriptor": "apex://LightningLoginCustomController/ACTION$login", "descriptor": "apex://LightningLoginCustomController/ACTION$login",
"callingDescriptor": "markup://c:loginForm", "callingDescriptor": "markup://c:loginForm",
"params": { "params": {
"username": quote(self._email), "username": quote(self._login_data.email),
"password": quote(self._password), "password": quote(self._login_data.password),
"startUrl": parse.unquote( "startUrl": start_url,
login_url.split("startURL=")[-1]
).split("%3D")[0],
}, },
} }
] data = {
}, "message": {"actions": [action]},
"aura.context": { "aura.context": {
"mode": "PROD", "mode": "PROD",
"fwuid": fw_uid, "fwuid": self._login_data.fw_uid,
"app": "siteforce:loginApp2", "app": "siteforce:loginApp2",
"loaded": loaded, "loaded": self._login_data.loaded,
"dn": [], "dn": [],
"globals": {}, "globals": {},
"uad": False, "uad": False,
}, },
"aura.pageURI": login_url, "aura.pageURI": self._login_data.url,
"aura.token": None, "aura.token": None,
} }
params = {"r": 3, "other.LightningLoginCustom.login": 1} params = {"r": 3, "other.LightningLoginCustom.login": 1}
@ -222,9 +235,9 @@ class HonAuth:
async def authenticate(self): async def authenticate(self):
self.clear() self.clear()
try: try:
if not (login_site := await self._load_login()): if not await self._load_login():
raise exceptions.HonAuthenticationError("Can't open login page") raise exceptions.HonAuthenticationError("Can't open login page")
if not (url := await self._login(*login_site)): if not (url := await self._login()):
raise exceptions.HonAuthenticationError("Can't login") raise exceptions.HonAuthenticationError("Can't login")
if not await self._get_token(url): if not await self._get_token(url):
raise exceptions.HonAuthenticationError("Can't get token") raise exceptions.HonAuthenticationError("Can't get token")