commit 4fd3d49af164202b81bcfbb209babab8228fb999 Author: Gabriel Huber Date: Thu May 15 13:51:54 2025 +0200 Test cat images diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..fa5212d --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +bot.ini +__pycache__ diff --git a/previewbot.py b/previewbot.py new file mode 100644 index 0000000..d0cfefc --- /dev/null +++ b/previewbot.py @@ -0,0 +1,90 @@ +import slixmpp +from slixmpp.exceptions import IqError, IqTimeout +import re +import urlpreview +import asyncio +import logging + +logging.basicConfig(level=logging.DEBUG) +url_regex = re.compile(r"https?://[^\s#]+") + +class PreviewBot(slixmpp.ClientXMPP): + def __init__(self, jid, password, nick, rooms): + super().__init__(jid, password) + + self.nick = nick + self.rooms = rooms + + self.register_plugin('xep_0030') # Service Discovery + self.register_plugin('xep_0045') # Multi-User Chat + self.register_plugin('xep_0199') # XMPP Ping + self.register_plugin("xep_0066") # Out of band data + + self.add_event_handler("session_start", self.on_session_start) + self.add_event_handler("message", self.on_message) + self.add_event_handler("groupchat_message", self.on_muc_message) + + def on_session_start(self, event): + self.send_presence() + for room in self.rooms: + self.plugin["xep_0045"].join_muc(room, self.nick) + + async def on_message(self, msg): + if msg["type"] in ("chat", "normal") and msg["from"] != self.nick: + url_matches = url_regex.findall(msg["body"]) + if url_matches: + print("Fetching previews for:", url_matches) + fetch_tasks = [urlpreview.get_preview(url) for url in url_matches] + previews = await asyncio.gather(*fetch_tasks) + preview_lines = [] + for url, preview_resp in zip(url_matches, previews): + if preview_resp is None: + preview_lines.append(f"Could not fetch preview for {url}") + else: + preview_lines.append(f"Title: {preview_resp}") + preview_text = "\n".join(preview_lines) + msg.reply(preview_text).send() + return + if "kuschelkatze" in msg["body"]: + reply_msg = msg.reply() + reply_msg["body"] = "http://bastiodon.lan/gabriel/ba8af1391c85f763.jpeg" + reply_msg["oob"]["url"] = "http://bastiodon.lan/gabriel/ba8af1391c85f763.jpeg" + reply_msg["oob"]["desc"] = "Katzenbild" + reply_msg.send() + + async def on_muc_message(self, msg): + if msg["type"] == "groupchat" and msg["from"] != self.nick: + url_matches = url_regex.findall(msg["body"]) + if url_matches: + print("Fetching previews for:", url_matches) + fetch_tasks = [urlpreview.get_preview(url) for url in url_matches] + previews = await asyncio.gather(*fetch_tasks) + preview_lines = [] + for url, preview_resp in zip(url_matches, previews): + if preview_resp is None: + preview_lines.append(f"Could not fetch preview for {url}") + else: + preview_lines.append(f"Title: {preview_resp}") + preview_text = "\n".join(preview_lines) + msg.reply(preview_text).send() + return + if "kuschelkatze" in msg["body"]: + reply_msg = msg.reply() + reply_msg["body"] = "http://bastiodon.lan/gabriel/ba8af1391c85f763.jpeg" + reply_msg["oob"]["url"] = "http://bastiodon.lan/gabriel/ba8af1391c85f763.jpeg" + reply_msg["oob"]["desc"] = "Katzenbild" + reply_msg.send() + + +if __name__ == "__main__": + config = configparser.ConfigParser() + config.read_file(open("bot.ini")) + bot = PreviewBot( + config["Login"]["jid"], + config["Login"]["password"], + config["Login"]["nick"], + config["Login"]["channels"].split(";") + ) + bot.connect() + print("Connected") + asyncio.get_event_loop().run_forever() diff --git a/urlpreview.py b/urlpreview.py new file mode 100644 index 0000000..50cb0bb --- /dev/null +++ b/urlpreview.py @@ -0,0 +1,39 @@ +import aiohttp +import html.parser +import asyncio + +class TitleExtractor(html.parser.HTMLParser): + def __init__(self): + super().__init__() + self.current_data = None + self.latest_title = None + + def handle_data(self, data): + self.current_data = data + + def handle_endtag(self, tag): + if tag == "title": + self.latest_title = self.current_data + +async def get_preview(url): + session = aiohttp.ClientSession() + session.headers["user-agent"] = "Mozilla/5.0 XmppPreviewer/1.0" + resp = await session.get(url) + title = None + print(url, resp.status) + if resp.headers["content-type"].startswith("text/html"): + parser = TitleExtractor() + while chunk := await resp.content.read(4096): + parser.feed(chunk.decode("utf-8")) # assume utf-8 + if parser.latest_title: + title = parser.latest_title + break + resp.close() + await session.close() + return title + +if __name__ == "__main__": + async def main(): + print(await get_preview("https://youtu.be/YUMaoIt1rDU")) + + asyncio.run(main())