Test cat images
This commit is contained in:
commit
4fd3d49af1
3 changed files with 131 additions and 0 deletions
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
|
@ -0,0 +1,2 @@
|
|||
bot.ini
|
||||
__pycache__
|
90
previewbot.py
Normal file
90
previewbot.py
Normal file
|
@ -0,0 +1,90 @@
|
|||
import slixmpp
|
||||
from slixmpp.exceptions import IqError, IqTimeout
|
||||
import re
|
||||
import urlpreview
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
url_regex = re.compile(r"https?://[^\s#]+")
|
||||
|
||||
class PreviewBot(slixmpp.ClientXMPP):
|
||||
def __init__(self, jid, password, nick, rooms):
|
||||
super().__init__(jid, password)
|
||||
|
||||
self.nick = nick
|
||||
self.rooms = rooms
|
||||
|
||||
self.register_plugin('xep_0030') # Service Discovery
|
||||
self.register_plugin('xep_0045') # Multi-User Chat
|
||||
self.register_plugin('xep_0199') # XMPP Ping
|
||||
self.register_plugin("xep_0066") # Out of band data
|
||||
|
||||
self.add_event_handler("session_start", self.on_session_start)
|
||||
self.add_event_handler("message", self.on_message)
|
||||
self.add_event_handler("groupchat_message", self.on_muc_message)
|
||||
|
||||
def on_session_start(self, event):
|
||||
self.send_presence()
|
||||
for room in self.rooms:
|
||||
self.plugin["xep_0045"].join_muc(room, self.nick)
|
||||
|
||||
async def on_message(self, msg):
|
||||
if msg["type"] in ("chat", "normal") and msg["from"] != self.nick:
|
||||
url_matches = url_regex.findall(msg["body"])
|
||||
if url_matches:
|
||||
print("Fetching previews for:", url_matches)
|
||||
fetch_tasks = [urlpreview.get_preview(url) for url in url_matches]
|
||||
previews = await asyncio.gather(*fetch_tasks)
|
||||
preview_lines = []
|
||||
for url, preview_resp in zip(url_matches, previews):
|
||||
if preview_resp is None:
|
||||
preview_lines.append(f"Could not fetch preview for {url}")
|
||||
else:
|
||||
preview_lines.append(f"Title: {preview_resp}")
|
||||
preview_text = "\n".join(preview_lines)
|
||||
msg.reply(preview_text).send()
|
||||
return
|
||||
if "kuschelkatze" in msg["body"]:
|
||||
reply_msg = msg.reply()
|
||||
reply_msg["body"] = "http://bastiodon.lan/gabriel/ba8af1391c85f763.jpeg"
|
||||
reply_msg["oob"]["url"] = "http://bastiodon.lan/gabriel/ba8af1391c85f763.jpeg"
|
||||
reply_msg["oob"]["desc"] = "Katzenbild"
|
||||
reply_msg.send()
|
||||
|
||||
async def on_muc_message(self, msg):
|
||||
if msg["type"] == "groupchat" and msg["from"] != self.nick:
|
||||
url_matches = url_regex.findall(msg["body"])
|
||||
if url_matches:
|
||||
print("Fetching previews for:", url_matches)
|
||||
fetch_tasks = [urlpreview.get_preview(url) for url in url_matches]
|
||||
previews = await asyncio.gather(*fetch_tasks)
|
||||
preview_lines = []
|
||||
for url, preview_resp in zip(url_matches, previews):
|
||||
if preview_resp is None:
|
||||
preview_lines.append(f"Could not fetch preview for {url}")
|
||||
else:
|
||||
preview_lines.append(f"Title: {preview_resp}")
|
||||
preview_text = "\n".join(preview_lines)
|
||||
msg.reply(preview_text).send()
|
||||
return
|
||||
if "kuschelkatze" in msg["body"]:
|
||||
reply_msg = msg.reply()
|
||||
reply_msg["body"] = "http://bastiodon.lan/gabriel/ba8af1391c85f763.jpeg"
|
||||
reply_msg["oob"]["url"] = "http://bastiodon.lan/gabriel/ba8af1391c85f763.jpeg"
|
||||
reply_msg["oob"]["desc"] = "Katzenbild"
|
||||
reply_msg.send()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
config = configparser.ConfigParser()
|
||||
config.read_file(open("bot.ini"))
|
||||
bot = PreviewBot(
|
||||
config["Login"]["jid"],
|
||||
config["Login"]["password"],
|
||||
config["Login"]["nick"],
|
||||
config["Login"]["channels"].split(";")
|
||||
)
|
||||
bot.connect()
|
||||
print("Connected")
|
||||
asyncio.get_event_loop().run_forever()
|
39
urlpreview.py
Normal file
39
urlpreview.py
Normal file
|
@ -0,0 +1,39 @@
|
|||
import aiohttp
|
||||
import html.parser
|
||||
import asyncio
|
||||
|
||||
class TitleExtractor(html.parser.HTMLParser):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.current_data = None
|
||||
self.latest_title = None
|
||||
|
||||
def handle_data(self, data):
|
||||
self.current_data = data
|
||||
|
||||
def handle_endtag(self, tag):
|
||||
if tag == "title":
|
||||
self.latest_title = self.current_data
|
||||
|
||||
async def get_preview(url):
|
||||
session = aiohttp.ClientSession()
|
||||
session.headers["user-agent"] = "Mozilla/5.0 XmppPreviewer/1.0"
|
||||
resp = await session.get(url)
|
||||
title = None
|
||||
print(url, resp.status)
|
||||
if resp.headers["content-type"].startswith("text/html"):
|
||||
parser = TitleExtractor()
|
||||
while chunk := await resp.content.read(4096):
|
||||
parser.feed(chunk.decode("utf-8")) # assume utf-8
|
||||
if parser.latest_title:
|
||||
title = parser.latest_title
|
||||
break
|
||||
resp.close()
|
||||
await session.close()
|
||||
return title
|
||||
|
||||
if __name__ == "__main__":
|
||||
async def main():
|
||||
print(await get_preview("https://youtu.be/YUMaoIt1rDU"))
|
||||
|
||||
asyncio.run(main())
|
Loading…
Add table
Add a link
Reference in a new issue