Files
telegram-bot-for-manipulate…/app/middlewares/outer_middlewares.py
ronis_0505 5662d8877a
Some checks failed
continuous-integration/drone Build is failing
MVP
2025-07-27 22:17:28 +03:00

30 lines
1.1 KiB
Python

import logging
from typing import Any, Awaitable, Callable, Dict
from aiogram import BaseMiddleware, Bot
from aiogram.types import TelegramObject
from database import async_session_, Worker
from sqlalchemy import select
class AccessCheckMiddleware(BaseMiddleware):
sessions_in_memory_db = set()
async def __call__(
self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any]
) -> Any:
event_data = event.message or event.callback_query
user = event_data.from_user.id
if user not in self.sessions_in_memory_db:
async with async_session_() as session:
async with session.begin():
result = await session.execute(select(Worker).where(Worker.telegram_id == event_data.from_user.id))
user = result.scalars().first()
if user:
self.sessions_in_memory_db.add(event_data.from_user.id)
return await handler(event, data)
return None