pyhOn/pyhon/connection/auth.py

292 lines
11 KiB
Python
Raw Permalink Normal View History

2023-02-13 00:41:38 +00:00
import json
import logging
import re
import secrets
import urllib
2023-04-15 13:55:22 +00:00
from contextlib import suppress
2023-04-15 12:37:27 +00:00
from dataclasses import dataclass
from datetime import datetime, timedelta
2023-07-01 12:59:09 +00:00
from typing import Dict, Optional, Any, List
2023-02-13 00:41:38 +00:00
from urllib import parse
2023-04-12 00:09:41 +00:00
from urllib.parse import quote
2023-02-13 00:41:38 +00:00
2023-06-28 17:02:11 +00:00
import aiohttp
2023-04-15 12:22:04 +00:00
from aiohttp import ClientResponse
2023-03-17 00:56:04 +00:00
from yarl import URL
2023-02-13 00:41:38 +00:00
2023-04-12 17:14:14 +00:00
from pyhon import const, exceptions
2023-06-28 17:02:11 +00:00
from pyhon.connection.device import HonDevice
2023-04-15 12:22:04 +00:00
from pyhon.connection.handler.auth import HonAuthConnectionHandler
2023-02-13 00:41:38 +00:00
2023-04-10 18:31:55 +00:00
_LOGGER = logging.getLogger(__name__)
2023-02-13 00:41:38 +00:00
2023-04-15 12:37:27 +00:00
@dataclass
class HonLoginData:
url: str = ""
email: str = ""
password: str = ""
fw_uid: str = ""
2023-06-28 17:02:11 +00:00
loaded: Optional[Dict[str, Any]] = None
2023-04-15 12:37:27 +00:00
2023-07-16 03:53:23 +00:00
@dataclass
class HonAuthData:
access_token: str = ""
refresh_token: str = ""
cognito_token: str = ""
id_token: str = ""
2023-02-13 00:41:38 +00:00
class HonAuth:
_TOKEN_EXPIRES_AFTER_HOURS = 8
_TOKEN_EXPIRE_WARNING_HOURS = 7
2023-06-28 17:02:11 +00:00
def __init__(
self,
session: aiohttp.ClientSession,
email: str,
password: str,
device: HonDevice,
) -> None:
2023-04-09 16:13:50 +00:00
self._session = session
2023-04-15 12:22:04 +00:00
self._request = HonAuthConnectionHandler(session)
2023-04-15 12:37:27 +00:00
self._login_data = HonLoginData()
self._login_data.email = email
self._login_data.password = password
2023-04-09 16:13:50 +00:00
self._device = device
self._expires: datetime = datetime.utcnow()
2023-07-16 03:53:23 +00:00
self._auth = HonAuthData()
2023-02-13 00:41:38 +00:00
@property
2023-04-15 12:22:04 +00:00
def cognito_token(self) -> str:
2023-07-16 03:53:23 +00:00
return self._auth.cognito_token
2023-02-13 00:41:38 +00:00
@property
2023-04-15 12:22:04 +00:00
def id_token(self) -> str:
2023-07-16 03:53:23 +00:00
return self._auth.id_token
2023-02-13 00:41:38 +00:00
2023-03-17 00:56:04 +00:00
@property
2023-04-15 12:22:04 +00:00
def access_token(self) -> str:
2023-07-16 03:53:23 +00:00
return self._auth.access_token
2023-03-17 00:56:04 +00:00
@property
2023-04-15 12:22:04 +00:00
def refresh_token(self) -> str:
2023-07-16 03:53:23 +00:00
return self._auth.refresh_token
2023-03-17 00:56:04 +00:00
2023-04-15 12:22:04 +00:00
def _check_token_expiration(self, hours: int) -> bool:
return datetime.utcnow() >= self._expires + timedelta(hours=hours)
@property
def token_is_expired(self) -> bool:
return self._check_token_expiration(self._TOKEN_EXPIRES_AFTER_HOURS)
@property
def token_expires_soon(self) -> bool:
return self._check_token_expiration(self._TOKEN_EXPIRE_WARNING_HOURS)
2023-04-15 12:22:04 +00:00
async def _error_logger(self, response: ClientResponse, fail: bool = True) -> None:
output = "hOn Authentication Error\n"
for i, (status, url) in enumerate(self._request.called_urls):
output += f" {i + 1: 2d} {status} - {url}\n"
output += f"ERROR - {response.status} - {response.request_info.url}\n"
output += f"{15 * '='} Response {15 * '='}\n{await response.text()}\n{40 * '='}"
_LOGGER.error(output)
if fail:
2023-04-12 17:14:14 +00:00
raise exceptions.HonAuthenticationError("Can't login")
2023-04-15 13:55:22 +00:00
@staticmethod
def _generate_nonce() -> str:
2023-03-17 00:56:04 +00:00
nonce = secrets.token_hex(16)
2023-04-15 12:22:04 +00:00
return f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}"
2023-04-15 13:55:22 +00:00
async def _load_login(self) -> bool:
2023-04-15 12:22:04 +00:00
login_url = await self._introduce()
login_url = await self._handle_redirects(login_url)
2023-04-15 13:55:22 +00:00
return await self._login_url(login_url)
2023-04-15 12:22:04 +00:00
async def _introduce(self) -> str:
redirect_uri = urllib.parse.quote(f"{const.APP}://mobilesdk/detect/oauth/done")
2023-03-17 00:56:04 +00:00
params = {
"response_type": "token+id_token",
"client_id": const.CLIENT_ID,
2023-04-15 12:22:04 +00:00
"redirect_uri": redirect_uri,
2023-03-17 00:56:04 +00:00
"display": "touch",
"scope": "api openid refresh_token web",
2023-04-15 12:22:04 +00:00
"nonce": self._generate_nonce(),
2023-03-17 00:56:04 +00:00
}
2023-04-15 13:55:22 +00:00
params_encode = "&".join([f"{k}={v}" for k, v in params.items()])
url = f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params_encode}"
2023-04-15 12:22:04 +00:00
async with self._request.get(url) as response:
2023-04-12 17:14:14 +00:00
text = await response.text()
self._expires = datetime.utcnow()
2023-07-01 12:59:09 +00:00
login_url: List[str] = re.findall("url = '(.+?)'", text)
if not login_url:
2023-04-12 17:14:14 +00:00
if "oauth/done#access_token=" in text:
self._parse_token_data(text)
raise exceptions.HonNoAuthenticationNeeded()
await self._error_logger(response)
2023-04-15 12:22:04 +00:00
return login_url[0]
async def _manual_redirect(self, url: str) -> str:
async with self._request.get(url, allow_redirects=False) as response:
2023-04-15 13:55:22 +00:00
if not (new_location := response.headers.get("Location", "")):
2023-04-15 12:22:04 +00:00
await self._error_logger(response)
return new_location
2023-06-28 17:02:11 +00:00
async def _handle_redirects(self, login_url: str) -> str:
2023-04-15 12:22:04 +00:00
redirect1 = await self._manual_redirect(login_url)
redirect2 = await self._manual_redirect(redirect1)
return f"{redirect2}&System=IoT_Mobile_App&RegistrationSubChannel=hOn"
2023-04-15 12:37:27 +00:00
async def _login_url(self, login_url: str) -> bool:
2023-04-15 12:22:04 +00:00
headers = {"user-agent": const.USER_AGENT}
url = URL(login_url, encoded=True)
async with self._request.get(url, headers=headers) as response:
text = await response.text()
if context := re.findall('"fwuid":"(.*?)","loaded":(\\{.*?})', text):
2023-03-17 00:56:04 +00:00
fw_uid, loaded_str = context[0]
2023-04-15 12:37:27 +00:00
self._login_data.fw_uid = fw_uid
self._login_data.loaded = json.loads(loaded_str)
2023-06-07 00:27:02 +00:00
self._login_data.url = login_url.replace(const.AUTH_API, "")
2023-04-15 12:37:27 +00:00
return True
2023-04-15 12:22:04 +00:00
await self._error_logger(response)
2023-04-15 13:55:22 +00:00
return False
2023-03-17 00:56:04 +00:00
2023-04-15 13:55:22 +00:00
async def _login(self) -> str:
start_url = self._login_data.url.rsplit("startURL=", maxsplit=1)[-1]
start_url = parse.unquote(start_url).split("%3D")[0]
2023-04-15 12:37:27 +00:00
action = {
"id": "79;a",
"descriptor": "apex://LightningLoginCustomController/ACTION$login",
"callingDescriptor": "markup://c:loginForm",
"params": {
2023-06-07 00:27:02 +00:00
"username": self._login_data.email,
"password": self._login_data.password,
2023-04-15 12:37:27 +00:00
"startUrl": start_url,
2023-02-13 00:41:38 +00:00
},
2023-04-15 12:37:27 +00:00
}
data = {
"message": {"actions": [action]},
2023-02-13 00:41:38 +00:00
"aura.context": {
"mode": "PROD",
2023-04-15 12:37:27 +00:00
"fwuid": self._login_data.fw_uid,
2023-02-13 00:41:38 +00:00
"app": "siteforce:loginApp2",
2023-04-15 12:37:27 +00:00
"loaded": self._login_data.loaded,
2023-02-13 00:41:38 +00:00
"dn": [],
"globals": {},
2023-04-09 18:55:36 +00:00
"uad": False,
},
2023-04-15 12:37:27 +00:00
"aura.pageURI": self._login_data.url,
2023-04-09 18:55:36 +00:00
"aura.token": None,
}
2023-02-13 00:41:38 +00:00
params = {"r": 3, "other.LightningLoginCustom.login": 1}
2023-04-15 12:22:04 +00:00
async with self._request.post(
2023-04-09 18:55:36 +00:00
const.AUTH_API + "/s/sfsites/aura",
headers={"Content-Type": "application/x-www-form-urlencoded"},
2023-06-07 00:27:02 +00:00
data="&".join(f"{k}={quote(json.dumps(v))}" for k, v in data.items()),
2023-04-09 18:55:36 +00:00
params=params,
2023-02-13 00:41:38 +00:00
) as response:
2023-03-17 00:56:04 +00:00
if response.status == 200:
2023-04-15 13:55:22 +00:00
with suppress(json.JSONDecodeError, KeyError):
result = await response.json()
2023-07-01 12:59:09 +00:00
url: str = result["events"][0]["attributes"]["values"]["url"]
return url
await self._error_logger(response)
2023-03-17 00:56:04 +00:00
return ""
2023-02-13 00:41:38 +00:00
2023-04-24 19:50:25 +00:00
def _parse_token_data(self, text: str) -> bool:
2023-04-12 17:14:14 +00:00
if access_token := re.findall("access_token=(.*?)&", text):
2023-07-16 03:53:23 +00:00
self._auth.access_token = access_token[0]
2023-04-12 17:14:14 +00:00
if refresh_token := re.findall("refresh_token=(.*?)&", text):
2023-07-16 03:53:23 +00:00
self._auth.refresh_token = refresh_token[0]
2023-04-12 17:14:14 +00:00
if id_token := re.findall("id_token=(.*?)&", text):
2023-07-16 03:53:23 +00:00
self._auth.id_token = id_token[0]
return bool(access_token and refresh_token and id_token)
2023-04-12 17:14:14 +00:00
2023-04-15 13:55:22 +00:00
async def _get_token(self, url: str) -> bool:
2023-04-15 12:22:04 +00:00
async with self._request.get(url) as response:
2023-04-10 18:31:55 +00:00
if response.status != 200:
await self._error_logger(response)
2023-03-17 00:56:04 +00:00
return False
2023-04-15 13:55:22 +00:00
url_search = re.findall(
"href\\s*=\\s*[\"'](.+?)[\"']", await response.text()
)
if not url_search:
await self._error_logger(response)
return False
2023-04-15 13:55:22 +00:00
if "ProgressiveLogin" in url_search[0]:
async with self._request.get(url_search[0]) as response:
2023-04-10 18:31:55 +00:00
if response.status != 200:
await self._error_logger(response)
2023-04-10 18:31:55 +00:00
return False
2023-04-15 13:55:22 +00:00
url_search = re.findall(
"href\\s*=\\s*[\"'](.*?)[\"']", await response.text()
)
2023-06-07 00:27:02 +00:00
url = const.AUTH_API + url_search[0]
2023-04-15 12:22:04 +00:00
async with self._request.get(url) as response:
2023-04-10 18:31:55 +00:00
if response.status != 200:
await self._error_logger(response)
2023-02-13 00:41:38 +00:00
return False
2023-04-24 19:50:25 +00:00
if not self._parse_token_data(await response.text()):
await self._error_logger(response)
return False
2023-02-13 00:41:38 +00:00
return True
2023-04-15 13:55:22 +00:00
async def _api_auth(self) -> bool:
2023-07-16 03:53:23 +00:00
post_headers = {"id-token": self._auth.id_token}
2023-04-09 16:13:50 +00:00
data = self._device.get()
2023-04-15 12:22:04 +00:00
async with self._request.post(
2023-04-09 18:55:36 +00:00
f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data
2023-04-10 18:31:55 +00:00
) as response:
2023-04-09 16:13:50 +00:00
try:
2023-04-10 18:31:55 +00:00
json_data = await response.json()
2023-04-09 16:13:50 +00:00
except json.JSONDecodeError:
await self._error_logger(response)
2023-04-09 16:13:50 +00:00
return False
2023-07-16 03:53:23 +00:00
self._auth.cognito_token = json_data.get("cognitoUser", {}).get("Token", "")
if not self._auth.cognito_token:
2023-04-21 21:34:10 +00:00
_LOGGER.error(json_data)
raise exceptions.HonAuthenticationError()
2023-02-13 00:41:38 +00:00
return True
2023-04-09 16:43:57 +00:00
2023-04-15 13:55:22 +00:00
async def authenticate(self) -> None:
2023-04-12 17:14:14 +00:00
self.clear()
try:
2023-04-15 12:37:27 +00:00
if not await self._load_login():
2023-04-12 17:14:14 +00:00
raise exceptions.HonAuthenticationError("Can't open login page")
2023-04-15 12:37:27 +00:00
if not (url := await self._login()):
2023-04-12 17:14:14 +00:00
raise exceptions.HonAuthenticationError("Can't login")
if not await self._get_token(url):
raise exceptions.HonAuthenticationError("Can't get token")
if not await self._api_auth():
raise exceptions.HonAuthenticationError("Can't get api token")
except exceptions.HonNoAuthenticationNeeded:
return
2023-04-15 13:55:22 +00:00
async def refresh(self) -> bool:
2023-04-09 16:43:57 +00:00
params = {
"client_id": const.CLIENT_ID,
2023-07-16 03:53:23 +00:00
"refresh_token": self._auth.refresh_token,
2023-04-09 18:55:36 +00:00
"grant_type": "refresh_token",
2023-04-09 16:43:57 +00:00
}
2023-04-15 12:22:04 +00:00
async with self._request.post(
2023-04-09 18:55:36 +00:00
f"{const.AUTH_API}/services/oauth2/token", params=params
2023-04-10 18:31:55 +00:00
) as response:
if response.status >= 400:
await self._error_logger(response, fail=False)
2023-04-09 16:43:57 +00:00
return False
2023-04-10 18:31:55 +00:00
data = await response.json()
self._expires = datetime.utcnow()
2023-07-16 03:53:23 +00:00
self._auth.id_token = data["id_token"]
self._auth.access_token = data["access_token"]
2023-04-11 15:09:02 +00:00
return await self._api_auth()
2023-04-12 17:14:14 +00:00
2023-04-15 13:55:22 +00:00
def clear(self) -> None:
2023-04-12 17:14:14 +00:00
self._session.cookie_jar.clear_domain(const.AUTH_API.split("/")[-2])
2023-04-15 12:22:04 +00:00
self._request.called_urls = []
2023-07-16 03:53:23 +00:00
self._auth.cognito_token = ""
self._auth.id_token = ""
self._auth.access_token = ""
self._auth.refresh_token = ""