68 lines
2.4 KiB
Python
68 lines
2.4 KiB
Python
import slixmpp
|
|
from slixmpp.exceptions import IqError, IqTimeout
|
|
import re
|
|
import urlpreview
|
|
import asyncio
|
|
import logging
|
|
import configparser
|
|
|
|
url_regex = re.compile(r"https?://[^\s#]+")
|
|
|
|
class PreviewBot(slixmpp.ClientXMPP):
|
|
def __init__(self, jid, password, nick, rooms):
|
|
super().__init__(jid, password)
|
|
|
|
self.nick = nick
|
|
self.rooms = rooms
|
|
|
|
self.register_plugin('xep_0030') # Service Discovery
|
|
self.register_plugin('xep_0045') # Multi-User Chat
|
|
self.register_plugin('xep_0199') # XMPP Ping
|
|
self.register_plugin("xep_0066") # Out of band data
|
|
|
|
self.add_event_handler("session_start", self.on_session_start)
|
|
self.add_event_handler("message", self.on_message)
|
|
self.add_event_handler("groupchat_message", self.on_muc_message)
|
|
|
|
def on_session_start(self, event):
|
|
self.send_presence()
|
|
for room in self.rooms:
|
|
self.plugin["xep_0045"].join_muc(room, self.nick)
|
|
|
|
async def on_message(self, msg):
|
|
if msg["type"] in ("chat", "normal") and msg["from"] != self.nick:
|
|
await self.handle_msg(msg)
|
|
|
|
async def on_muc_message(self, msg):
|
|
if msg["type"] == "groupchat" and msg["from"] != self.nick:
|
|
await self.handle_msg(msg)
|
|
|
|
async def handle_msg(self, msg):
|
|
url_matches = url_regex.findall(msg["body"])
|
|
if url_matches:
|
|
print("Fetching previews for:", url_matches)
|
|
fetch_tasks = [urlpreview.get_preview(url) for url in url_matches]
|
|
previews = await asyncio.gather(*fetch_tasks)
|
|
preview_lines = []
|
|
for url, preview_resp in zip(url_matches, previews):
|
|
if preview_resp is None:
|
|
preview_lines.append(f"Could not fetch preview for {url}")
|
|
else:
|
|
preview_lines.append(f"Title: {preview_resp}")
|
|
preview_text = "\n".join(preview_lines)
|
|
msg.reply(preview_text).send()
|
|
|
|
if __name__ == "__main__":
|
|
config = configparser.ConfigParser()
|
|
config.read_file(open("bot.ini"))
|
|
if "Logging" in config:
|
|
logging.basicConfig(level=int(config["Logging"]["loglevel"]))
|
|
bot = PreviewBot(
|
|
config["Login"]["jid"],
|
|
config["Login"]["password"],
|
|
config["Login"]["nick"],
|
|
config["Login"]["channels"].split(";")
|
|
)
|
|
bot.connect()
|
|
print("Connected")
|
|
asyncio.get_event_loop().run_forever()
|