215 lines
7.7 KiB
Python
215 lines
7.7 KiB
Python
import discord
|
|
import asyncio
|
|
import re
|
|
import io
|
|
import traceback
|
|
from bot.cogs.cog_manager import BaseCog
|
|
from utils.time_parser import parse_time
|
|
|
|
class UtilityCog(BaseCog):
|
|
def __init__(self, bot):
|
|
super().__init__(bot)
|
|
|
|
async def cmd_remindme(self, message):
|
|
"""
|
|
Set a reminder
|
|
Usage: .remindme <time> <text>
|
|
Example: .remindme 5m Check the oven
|
|
"""
|
|
content = message.content.strip()
|
|
match = re.match(r'\.remindme\s+(\d+[smhd])\s+(.*)', content)
|
|
|
|
if not match:
|
|
await message.edit(content="❌ Usage: `.remindme <time> <text>`\nExample: `.remindme 5m Check the oven`")
|
|
return
|
|
|
|
time_str = match.group(1)
|
|
reminder_text = match.group(2)
|
|
|
|
duration = parse_time(time_str)
|
|
if not duration:
|
|
await message.edit(content="❌ Invalid time format. Use numbers followed by s (seconds), m (minutes), h (hours), or d (days).")
|
|
return
|
|
|
|
await message.edit(content=f"✅ I'll remind you in {time_str}: {reminder_text}")
|
|
|
|
# Schedule the reminder
|
|
async def reminder_task():
|
|
await asyncio.sleep(duration)
|
|
await message.reply(f"{message.author.mention} Reminder: {reminder_text.replace('@', 'at')}")
|
|
|
|
asyncio.create_task(reminder_task())
|
|
|
|
async def cmd_fmt(self, message):
|
|
"""
|
|
Format text nicely (code blocks, etc)
|
|
Usage: .fmt <language> <code>
|
|
Example: .fmt py print("Hello World")
|
|
"""
|
|
content = message.content.strip()
|
|
if len(content.split()) < 3:
|
|
await message.edit(content="❌ Usage: `.fmt <language> <code>`")
|
|
return
|
|
|
|
_, lang, *code_parts = content.split()
|
|
code = " ".join(code_parts)
|
|
|
|
formatted = f"{lang}\n{code}\n"
|
|
await message.edit(content=formatted)
|
|
|
|
async def cmd_eval(self, message):
|
|
"""
|
|
Evaluate Python code (dangerous, use with caution)
|
|
Usage: .eval <code>
|
|
"""
|
|
content = message.content.strip()
|
|
if len(content.split()) < 2:
|
|
await message.edit(content="❌ Usage: `.eval <code>`")
|
|
return
|
|
|
|
code = content.split(" ", 1)[1]
|
|
|
|
# Create a safe execution environment
|
|
local_vars = {
|
|
'bot': self.bot,
|
|
'message': message,
|
|
'discord': discord,
|
|
'asyncio': asyncio
|
|
}
|
|
|
|
try:
|
|
# Add return if it doesn't exist
|
|
if not code.strip().startswith("return ") and "\n" not in code:
|
|
code = f"return {code}"
|
|
|
|
# Wrap in async function
|
|
code = f"async def __eval_func__():\n{' ' * 4}{code.replace(chr(10), chr(10) + ' ' * 4)}\nresult = asyncio.run_coroutine_threadsafe(__eval_func__(), bot.loop).result()"
|
|
|
|
# Execute code
|
|
exec(code, globals(), local_vars)
|
|
result = local_vars['result']
|
|
|
|
# Format result
|
|
if result is None:
|
|
await message.edit(content="✅ Code executed successfully (no output)")
|
|
else:
|
|
await message.edit(content=f"\n{result}\n")
|
|
|
|
except Exception as e:
|
|
error = traceback.format_exc()
|
|
await message.edit(content=f"❌ Error:\n\n{error}\n")
|
|
|
|
async def cmd_delrecent(self, message):
|
|
"""
|
|
Delete recent messages from the user
|
|
Usage: .delrecent <count>
|
|
"""
|
|
content = message.content.strip()
|
|
parts = content.split()
|
|
|
|
if len(parts) != 2:
|
|
await message.edit(content="❌ Usage: `.delrecent <count>`")
|
|
return
|
|
|
|
try:
|
|
count = int(parts[1])
|
|
if count < 1 or count > 100: # Discord limitation
|
|
await message.edit(content="❌ Count must be between 1 and 100")
|
|
return
|
|
except ValueError:
|
|
await message.edit(content="❌ Count must be a number")
|
|
return
|
|
|
|
await message.edit(content=f"🗑️ Deleting {count} recent messages...")
|
|
|
|
# Delete messages
|
|
deleted = 0
|
|
async for msg in message.channel.history(limit=200):
|
|
if msg.author == self.bot.user:
|
|
await msg.delete()
|
|
deleted += 1
|
|
if deleted >= count:
|
|
break
|
|
await asyncio.sleep(0.5) # Avoid rate limiting
|
|
|
|
# The original message likely got deleted too, so we send a new one
|
|
await message.channel.send(f"✅ Deleted {deleted} message(s)", delete_after=5)
|
|
|
|
async def cmd_nuke_server(self, message):
|
|
"""
|
|
Nuke a server (dangerous)
|
|
Usage: .nuke
|
|
"""
|
|
try:
|
|
# Extra safety check
|
|
confirmation = await message.channel.send("⚠️ Are you sure you want to nuke this server? Reply 'yes' to confirm.")
|
|
|
|
def check(m):
|
|
return m.author == message.author and m.content.lower() == "yes"
|
|
|
|
try:
|
|
await self.bot.wait_for('message', check=check, timeout=15.0)
|
|
except asyncio.TimeoutError:
|
|
await message.edit(content="❌ Nuke cancelled.")
|
|
await confirmation.delete()
|
|
return
|
|
|
|
await confirmation.delete()
|
|
await message.edit(content="💣 Nuking server...")
|
|
|
|
for channel in message.guild.channels:
|
|
await channel.delete()
|
|
|
|
for member in message.guild.members:
|
|
if member != self.bot.user and member != message.guild.owner:
|
|
await member.kick(reason="Server nuke")
|
|
|
|
await message.edit(content="💥 Server nuked successfully")
|
|
|
|
except Exception as e:
|
|
await message.edit(content=f"❌ Error nuking server: {str(e)}")
|
|
|
|
async def cmd_savechannel(self, message):
|
|
"""
|
|
Save a channel's messages to a file
|
|
Usage: .savechannel [limit]
|
|
"""
|
|
content = message.content.strip()
|
|
parts = content.split()
|
|
|
|
limit = 100 # Default limit
|
|
if len(parts) >= 2:
|
|
try:
|
|
limit = int(parts[1])
|
|
if limit < 1:
|
|
await message.edit(content="❌ Limit must be positive")
|
|
return
|
|
except ValueError:
|
|
await message.edit(content="❌ Limit must be a number")
|
|
return
|
|
|
|
await message.edit(content=f"📥 Saving the last {limit} messages...")
|
|
|
|
# Get messages
|
|
messages = []
|
|
async for msg in message.channel.history(limit=limit):
|
|
time_str = msg.created_at.strftime("%Y-%m-%d %H:%M:%S")
|
|
author = f"{msg.author.name}#{msg.author.discriminator}"
|
|
content = msg.content or "[No Text Content]"
|
|
|
|
# Handle attachments
|
|
attachments = ""
|
|
if msg.attachments:
|
|
attachments = f" [Attachments: {', '.join(a.url for a in msg.attachments)}]"
|
|
|
|
messages.append(f"[{time_str}] {author}: {content}{attachments}")
|
|
|
|
# Reverse to get chronological order
|
|
messages.reverse()
|
|
|
|
# Create file
|
|
content = "\n".join(messages)
|
|
file = discord.File(io.BytesIO(content.encode()), filename=f"channel_{message.channel.name}.txt")
|
|
|
|
await message.delete()
|
|
await message.channel.send(f"📁 Here are the last {limit} messages from this channel:", file=file) |