151 lines
6.4 KiB
Python
151 lines
6.4 KiB
Python
import asyncio
|
|
import traceback
|
|
import datetime
|
|
import os
|
|
import ast
|
|
import gzip
|
|
import json
|
|
from utils.time_parser import parse_time
|
|
from config import DOWNLOADS_DIR
|
|
|
|
class UtilityCommands:
|
|
def __init__(self, bot):
|
|
self.bot = bot
|
|
|
|
async def cmd_remindme(self, message):
|
|
"""Set a reminder"""
|
|
parts = message.content.split(" ", 2)
|
|
if len(parts) < 3:
|
|
await message.reply("Usage: `.remindme <time> <message>`", silent=True)
|
|
else:
|
|
duration = parse_time(parts[1])
|
|
if duration is None:
|
|
await message.reply("Invalid time format. Example: `4m2s`, `1h30m`.", silent=True)
|
|
else:
|
|
reminder_text = parts[2]
|
|
await message.reply(f"Reminder set for {parts[1]}.", silent=True)
|
|
|
|
async def reminder_task():
|
|
await asyncio.sleep(duration)
|
|
await message.reply(f"{message.author.mention} Reminder: {reminder_text.replace('@', 'at')}")
|
|
|
|
asyncio.create_task(reminder_task())
|
|
|
|
async def cmd_fmt(self, message):
|
|
"""Format a string using f-string"""
|
|
try:
|
|
formatted = eval(f"f'{message.content[5:]}'", globals(), locals())
|
|
print(formatted)
|
|
await asyncio.wait_for(message.edit(formatted), timeout=5)
|
|
except Exception as e:
|
|
flashdel = await message.channel.send(f"Error: {e}", silent=True)
|
|
await message.delete()
|
|
await flashdel.delete()
|
|
raise RuntimeWarning from e
|
|
|
|
async def cmd_eval(self, message):
|
|
"""Evaluate Python code"""
|
|
try:
|
|
formatted = message.content[6:]
|
|
print(repr(formatted))
|
|
|
|
exec_scope = {
|
|
"msg": message,
|
|
"asyncio": asyncio,
|
|
"random": __import__("random"),
|
|
**self.bot.loaded_commands,
|
|
"out": lambda content: message.reply(content, silent=True),
|
|
}
|
|
|
|
# Parse the code to detect non-async function calls
|
|
tree = ast.parse(formatted)
|
|
for node in ast.walk(tree):
|
|
if isinstance(node, ast.Call) and not isinstance(node.func, ast.Attribute):
|
|
# Check if the function is a coroutine
|
|
func_name = node.func.id
|
|
if func_name in exec_scope and asyncio.iscoroutinefunction(exec_scope[func_name]):
|
|
# Replace the call with an await expression
|
|
formatted = formatted.replace(f"{func_name}(", f"await {func_name}(")
|
|
|
|
exec(f"async def __eval():\n {formatted.replace(chr(10), chr(10) + ' ')}", exec_scope)
|
|
|
|
await exec_scope["__eval"]()
|
|
|
|
except Exception:
|
|
await message.edit(content=traceback.format_exc())
|
|
finally:
|
|
await message.delete()
|
|
async def cmd_nuke_server(self, message):
|
|
try:
|
|
for i in message.guild.channels:
|
|
await i.delete()
|
|
except Exception as e:
|
|
await message.edit(content=traceback.format_exc())
|
|
finally:
|
|
await message.delete()
|
|
|
|
async def cmd_delrecent(self, message):
|
|
"""Delete recent messages"""
|
|
try:
|
|
minutes = float(message.content.split(" ", 1)[1])
|
|
cutoff_time = datetime.datetime.now(datetime.UTC) - datetime.timedelta(minutes=minutes)
|
|
|
|
deleted = 0
|
|
async for msg in message.channel.history(limit=100):
|
|
if msg.author == self.bot.user and msg.created_at >= cutoff_time:
|
|
await msg.delete()
|
|
deleted += 1
|
|
|
|
await message.channel.send(f"Deleted {deleted} messages.", delete_after=0, silent=True)
|
|
except Exception as e:
|
|
await message.channel.send(f"Error: {e}", delete_after=0, silent=True)
|
|
|
|
async def cmd_savechannel(self, message):
|
|
"""Save all messages in a channel to a file"""
|
|
try:
|
|
messages = []
|
|
async for msg in message.channel.history(limit=None):
|
|
msg_dict = {
|
|
"id": msg.id,
|
|
"content": msg.content,
|
|
"created_at": msg.created_at.isoformat() if msg.created_at else None,
|
|
"edited_at": msg.edited_at.isoformat() if msg.edited_at else None,
|
|
"author": {
|
|
"id": msg.author.id,
|
|
"name": msg.author.name,
|
|
"discriminator": msg.author.discriminator,
|
|
"bot": msg.author.bot
|
|
} if msg.author else None,
|
|
"channel_id": msg.channel.id,
|
|
"attachments": [
|
|
{"filename": a.filename, "url": a.url} for a in msg.attachments
|
|
] if msg.attachments else [],
|
|
"embeds": [embed.to_dict() for embed in msg.embeds] if msg.embeds else [],
|
|
"reactions": [
|
|
{"emoji": str(r.emoji), "count": r.count} for r in msg.reactions
|
|
] if msg.reactions else [],
|
|
"mentions": [user.id for user in msg.mentions] if msg.mentions else [],
|
|
"role_mentions": [role.id for role in msg.role_mentions] if msg.role_mentions else [],
|
|
"pinned": msg.pinned,
|
|
"tts": msg.tts,
|
|
"type": str(msg.type),
|
|
"reference": {
|
|
"message_id": msg.reference.message_id,
|
|
"channel_id": msg.reference.channel_id,
|
|
"guild_id": msg.reference.guild_id
|
|
} if msg.reference else None,
|
|
}
|
|
messages.append(msg_dict)
|
|
|
|
filename = f"{message.channel.id}.json.gz"
|
|
os.makedirs(DOWNLOADS_DIR, exist_ok=True)
|
|
filepath = os.path.join(DOWNLOADS_DIR, filename)
|
|
|
|
with gzip.open(filepath, "wt", encoding="utf-8") as f:
|
|
json.dump(messages, f, ensure_ascii=False, indent=4)
|
|
|
|
await message.edit(content=f"Saved messages to `{filepath}`")
|
|
|
|
except Exception as e:
|
|
await message.edit(content=f"Error: {e}")
|