# This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from typing import List, Set, Type, Dict from functools import partial import json from aiohttp.web import Response, Request import asyncio from asyncio import Task from aiohttp import ClientSession from yarl import URL from maubot import Plugin, MessageEvent from maubot.handlers import command, event, web from mautrix.types import EventType, Membership, MessageType, RoomID, StateEvent from mautrix.util.config import BaseProxyConfig from .db import Database from .config import Config from pprint import pprint class WebhookBot(Plugin): task_list: List[Task] joined_rooms: Set[RoomID] async def start(self) -> None: await super().start() self.config.load_and_update() self.db = Database(self.database) self.joined_rooms = set(await self.client.get_joined_rooms()) self.task_list = [] async def stop(self) -> None: if self.task_list: await asyncio.wait(self.task_list, timeout=1) @classmethod def get_config_class(cls) -> Type[BaseProxyConfig]: return Config @event.on(EventType.ROOM_MEMBER) async def member_handler(self, evt: StateEvent) -> None: """ updates the stored joined_rooms object whenever the bot joins or leaves a room. """ if evt.state_key != self.client.mxid: return if evt.content.membership in (Membership.LEAVE, Membership.BAN): self.joined_rooms.remove(evt.room_id) if evt.content.membership == Membership.JOIN and evt.state_key == self.client.mxid: self.joined_rooms.add(evt.room_id) # sending GCM message @command.new("m", help="Send message to 8x8") @command.argument("message", pass_raw=True) async def outgoing_message_handler(self, evt: MessageEvent, message: str) -> None: url: URL = URL("https://fcm.googleapis.com/fcm/send") headers: Dict[str, str] = {"Content-Type": "application/json", "Authorization": "key=%s" % self.config["gcm-server-key"]} req = { "data": {"message": "%s" % message, "title": "%s" % evt.room_id, "collapse_key": "do_not_collapse", "event": "message", "content-available": "1"}, "registration_ids": ["%s" % self.db.get_gcm("sixtyfour")], } async with ClientSession() as sess: resp = await sess.post(url, headers=headers, data=json.dumps(req)) # End sending GCM message # Webhook handling @web.post("/webhook/r0") async def post_handler(self, request: Request) -> Response: if "room" not in request.query: return Response(text="400: Bad request\n" "No room specified. Did you forget the '?room=' query parameter?\n", status=400) if request.query["room"] not in self.joined_rooms: return Response(text="403: Forbidden\nThe bot is not in the room. " f"Please invite the bot to the room.\n", status=403) if request.headers.getone("Content-Type", "") != "application/json": return Response(status=406, text="406: Not Acceptable\n", headers={"Accept": "application/json"}) if not request.can_read_body: return Response(status=400, text="400: Bad request\n" "Missing request body\n") task = self.loop.create_task(self.process_hook_01(request)) self.task_list += [task] return Response(status=202, text="202: Accepted\nWebhook processing started.\n") async def process_hook_01(self, req: Request) -> None: if self.config["send_as_notice"]: msgtype = MessageType.NOTICE else: msgtype = MessageType.TEXT try: msg = None body = await req.json() if body["secret"] != self.config["webhook-secret"]: self.log.error("Failed to handle event: secret doesnt match.") else: msg = (f"{body['message']}") room_id = RoomID(req.query["room"]) if msg: event_id = await self.client.send_markdown(room_id, msg, allow_html=True, msgtype=msgtype) except Exception: self.log.error("Failed to handle event", exc_info=True) task = asyncio.current_task() if task: self.task_list.remove(task) # end Webhook handling # Update GCM Key @web.post("/webhook/r1") async def post_handler_02(self, request: Request) -> Response: if "gcm" not in request.query: return Response(text="400: Bad request\n" "No GCM Key included\n", status=400) if request.headers.getone("Content-Type", "") != "application/json": return Response(status=406, text="406: Not Acceptable\n", headers={"Accept": "application/json"}) if not request.can_read_body: return Response(status=400, text="400: Bad request\n" "Missing request body\n") task = self.loop.create_task(self.process_hook_02(request)) self.task_list += [task] return Response(status=202, text="202: Accepted\nWebhook processing started.\n") async def process_hook_02(self, req: Request) -> None: try: msg = None body = await req.json() if body["secret"] != self.config["webhook-secret"]: self.log.error("Failed to handle event: secret doesnt match.") else: if self.db.has_gcm("sixtyfour"): self.db.rm_gcm("sixtyfour") self.db.add_gcm("sixtyfour", body["gcm"]) self.log.info("Updated GCM key: %s" % body["gcm"]) except Exception: self.log.error("Failed to handle event", exc_info=True) task = asyncio.current_task() if task: self.task_list.remove(task) # end Update GCM Key