pyhOn/pyhon/connection/handler/anonym.py

28 lines
915 B
Python
Raw Normal View History

2023-04-15 12:22:04 +00:00
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
2023-06-28 17:02:11 +00:00
from typing import Dict, Any
import aiohttp
2023-06-28 18:25:52 +00:00
from yarl import URL
2023-04-15 12:22:04 +00:00
from pyhon import const
from pyhon.connection.handler.base import ConnectionHandler
2023-06-28 17:02:11 +00:00
from pyhon.typedefs import Callback
2023-04-15 12:22:04 +00:00
_LOGGER = logging.getLogger(__name__)
class HonAnonymousConnectionHandler(ConnectionHandler):
2023-06-28 17:02:11 +00:00
_HEADERS: Dict[str, str] = ConnectionHandler._HEADERS | {"x-api-key": const.API_KEY}
2023-04-15 12:22:04 +00:00
@asynccontextmanager
2023-06-28 17:02:11 +00:00
async def _intercept(
2023-06-28 18:25:52 +00:00
self, method: Callback, url: str | URL, *args: Any, **kwargs: Dict[str, Any]
2023-06-28 17:02:11 +00:00
) -> AsyncIterator[aiohttp.ClientResponse]:
2023-04-15 12:22:04 +00:00
kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS
2023-06-28 18:25:52 +00:00
async with method(url, *args, **kwargs) as response:
2023-04-15 12:22:04 +00:00
if response.status == 403:
_LOGGER.error("Can't authenticate anymore")
yield response