403 lines
17 KiB
Python
403 lines
17 KiB
Python
import random
|
|
import discord
|
|
import asyncio
|
|
import traceback
|
|
import random
|
|
import datetime
|
|
import os
|
|
import importlib.util
|
|
import ast
|
|
import websockets
|
|
import json
|
|
import difflib
|
|
import io
|
|
import gzip
|
|
import re
|
|
from dotenv import load_dotenv
|
|
|
|
# Load environment variables from .env file
|
|
load_dotenv()
|
|
|
|
time_regex = re.compile(r'(\d+)([smhd])') # Matches 4m2s, 1h30m, etc.
|
|
|
|
def parse_time(time_str):
|
|
units = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}
|
|
total_seconds = sum(int(amount) * units[unit] for amount, unit in time_regex.findall(time_str))
|
|
return total_seconds if total_seconds > 0 else None
|
|
|
|
COMMANDS_DIR = "commands"
|
|
os.makedirs(COMMANDS_DIR, exist_ok=True)
|
|
|
|
def load_commands():
|
|
commands = {}
|
|
for filename in os.listdir(COMMANDS_DIR):
|
|
if filename.endswith(".py"):
|
|
cmd_name = filename[:-3]
|
|
cmd_path = os.path.join(COMMANDS_DIR, filename)
|
|
|
|
spec = importlib.util.spec_from_file_location(cmd_name, cmd_path)
|
|
module = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(module)
|
|
|
|
if hasattr(module, "run") and callable(module.run):
|
|
commands[cmd_name] = module.run
|
|
return commands
|
|
# Add a tracking file to save channel IDs
|
|
TRACKED_CHANNELS_FILE = "tracked_channels.json"
|
|
|
|
def load_tracked_channels():
|
|
if os.path.exists(TRACKED_CHANNELS_FILE):
|
|
with open(TRACKED_CHANNELS_FILE, 'r') as f:
|
|
return json.load(f)
|
|
return []
|
|
|
|
def save_tracked_channels(tracked_channels):
|
|
with open(TRACKED_CHANNELS_FILE, 'w') as f:
|
|
json.dump(tracked_channels, f)
|
|
|
|
async def handle_blacklist(message):
|
|
if message.author in [696800726084747314]:
|
|
await message.reply("no")
|
|
|
|
class Selfbot(discord.Client):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.default_status = None
|
|
self.loaded_commands = load_commands()
|
|
self.tracked_channels = load_tracked_channels()
|
|
self.last_status = {}
|
|
self.AFK_STATUS = False # Static variable to track AFK status
|
|
self.AFK_NOTIFIED_USERS = [] # Static list to store users notified of AFK status
|
|
self.horsin = []
|
|
async def on_ready(self):
|
|
print(f"Logged in as {self.user}")
|
|
async def handle_afk_dm(self,message):
|
|
if self.AFK_STATUS and message.author.id not in self.AFK_NOTIFIED_USERS:
|
|
await message.reply(
|
|
"Heya, I'm not at my computer right now, if you're requesting something please follow <https://nohello.club>. I'll let you know when I'm back :) \n\n-# This action was automated."
|
|
)
|
|
self.AFK_NOTIFIED_USERS.append(message.author.id)
|
|
async def on_message(self, message):
|
|
if message.author.id == 1169111190824308768 and "<@1236667927944761396>" in message.content:
|
|
await message.reply("shut the fuck up")
|
|
|
|
if message.content.startswith(".remindme "):
|
|
await handle_blacklist(message)
|
|
|
|
parts = message.content.split(" ", 2)
|
|
if len(parts) < 3:
|
|
await message.reply("Usage: `.remindme <time> <message>`", silent=True)
|
|
else:
|
|
duration = parse_time(parts[1])
|
|
if duration is None:
|
|
await message.reply("Invalid time format. Example: `4m2s`, `1h30m`.", silent=True)
|
|
else:
|
|
reminder_text = parts[2]
|
|
await message.reply(f"Reminder set for {parts[1]}.", silent=True)
|
|
|
|
async def reminder_task():
|
|
await asyncio.sleep(duration)
|
|
await message.reply(f"{message.author.mention} Reminder: {reminder_text.replace("@", "at")}")
|
|
|
|
asyncio.create_task(reminder_task())
|
|
elif message.content.startswith(".rps "):
|
|
await handle_blacklist(message)
|
|
parts = message.content.split(" ",2)
|
|
if len(parts) != 2:
|
|
await message.reply("Usage: `.rps <item`",silent=True)
|
|
item
|
|
item = parts[1]
|
|
if item.lower() == "dick":
|
|
await message.reply("Scissors beats dick any day :3",silent=True) # little easter egg
|
|
return
|
|
if item == "<@696800726084747314>":
|
|
await message.reply("Head so thick that i would try rock but the rock would break")
|
|
return
|
|
choice = random.choice([1,2,3])
|
|
rps_map = dict(zip((1,2,3),[i for i in "rps"]))
|
|
rps_map_reverse = dict(zip([i for i in "rps"],(1,2,3))) # FIXME: if you cant see the issue you're blind
|
|
shortmaps = {word[0]: word for word in ["rock", "paper", "scissors"]}
|
|
|
|
beat_map = {1:3,2:1,3:2}
|
|
iid = 0
|
|
for k,v in rps_map_reverse.items():
|
|
if item.lower().startswith(k):
|
|
iid = v
|
|
break
|
|
if iid == 0:
|
|
await message.reply("Invalid choice!",silent=True)
|
|
return
|
|
if choice == iid:
|
|
await message.reply(f"Huh we chose the same thing. Try again!",silent=True)
|
|
return
|
|
if beat_map[iid] == choice:
|
|
await message.reply(f"Welp, ggs, I won. I chose `{shortmaps[rps_map[choice]]}`",silent=True)
|
|
else:
|
|
await message.reply(f"Oop, you lost. Try again later! I chose `{shortmaps[rps_map[choice]]}`",silent=True)
|
|
|
|
|
|
|
|
|
|
|
|
# Handle DM if in AFK mode
|
|
if isinstance(message.channel, discord.DMChannel) and message.author != self.user:
|
|
await self.handle_afk_dm(message)
|
|
|
|
if message.channel.id in self.horsin:
|
|
await message.add_reaction("🐴")
|
|
if message.author.id == 1341423498618208257:
|
|
await message.add_reaction("🪣")
|
|
if message.author != self.user:
|
|
if message.author.id == 1351739454141763626:
|
|
try:
|
|
await message.delete()
|
|
except:
|
|
pass
|
|
return
|
|
|
|
if message.content.startswith(".horse"):
|
|
if message.channel.id in self.horsin:
|
|
self.horsin.remove(message.channel.id)
|
|
await message.reply("no longer horsin around D:")
|
|
else:
|
|
self.horsin.append(message.channel.id)
|
|
await message.reply(":D")
|
|
|
|
if message.content.startswith(".afk"):
|
|
if not self.AFK_STATUS:
|
|
self.AFK_STATUS = True
|
|
await message.reply("k")
|
|
else:
|
|
await message.reply("You are already in AFK mode.", silent=True)
|
|
|
|
# UNAFK Command
|
|
elif message.content.startswith(".unafk"):
|
|
if self.AFK_STATUS:
|
|
self.AFK_STATUS = False
|
|
for i in self.AFK_NOTIFIED_USERS:
|
|
try:
|
|
user = await self.fetch_user(i) # Fetch user by ID
|
|
if user:
|
|
dm_channel = await user.create_dm() # Create DM channel
|
|
await dm_channel.send("Hey, I'm back, human me will take over now!")
|
|
except Exception as e:
|
|
raise RuntimeWarning from e
|
|
|
|
self.AFK_NOTIFIED_USERS.clear() # Clear the AFK notified users list
|
|
await message.reply("should work")
|
|
else:
|
|
await message.reply("You are not in AFK mode.", silent=True)
|
|
|
|
|
|
|
|
|
|
if message.content.startswith(".fmt "):
|
|
try:
|
|
formated = eval(f"f'{message.content[5:]}'", globals(),locals())
|
|
print(formated)
|
|
await asyncio.wait_for(message.edit(formated), timeout=5)
|
|
|
|
except Exception as e:
|
|
flashdel = await message.channel.send(f"Error: {e}",silent=True)
|
|
await message.delete()
|
|
await flashdel.delete()
|
|
raise RuntimeWarning from e
|
|
|
|
elif message.content.startswith(".eval "):
|
|
try:
|
|
formatted = message.content[6:]
|
|
print(repr(formatted))
|
|
exec_scope = {
|
|
"msg": message,
|
|
"asyncio": asyncio,
|
|
"random": random,
|
|
**self.loaded_commands, # Inject loaded commands
|
|
"out": lambda content: message.reply(content, silent=True),
|
|
}
|
|
# Parse the code to detect non-async function calls
|
|
tree = ast.parse(formatted)
|
|
for node in ast.walk(tree):
|
|
if isinstance(node, ast.Call) and not isinstance(node.func, ast.Attribute):
|
|
# Check if the function is a coroutine
|
|
func_name = node.func.id
|
|
if func_name in exec_scope and asyncio.iscoroutinefunction(exec_scope[func_name]):
|
|
# Replace the call with an await expression
|
|
formatted = formatted.replace(f"{func_name}(", f"await {func_name}(")
|
|
|
|
|
|
exec(f"async def __eval():\n {formatted.replace(chr(10), chr(10) + ' ')}", exec_scope)
|
|
|
|
result = await exec_scope["__eval"]()
|
|
|
|
except Exception:
|
|
await message.edit(content=traceback.format_exc())
|
|
finally:
|
|
await message.delete()
|
|
|
|
|
|
|
|
elif message.content.startswith(".addcmd "):
|
|
try:
|
|
parts = message.content.split(" ", 2)
|
|
if len(parts) < 3:
|
|
await message.reply("Usage: .addcmd <name> <code>",silent=True)
|
|
return
|
|
|
|
cmd_name, code = parts[1], parts[2]
|
|
cmd_path = os.path.join(COMMANDS_DIR, f"{cmd_name}.py")
|
|
|
|
with open(cmd_path, "w") as f:
|
|
f.write("async def run(msg):\n")
|
|
for line in code.split("\n"):
|
|
f.write(f" {line}\n")
|
|
|
|
self.loaded_commands = load_commands()
|
|
await message.reply(f"Command {cmd_name} saved.",silent=True)
|
|
except Exception as e:
|
|
await message.reply(f"Error: {e}",silent=True)
|
|
|
|
elif message.content.startswith(".delcmd "):
|
|
cmd_name = message.content.split(" ", 1)[1]
|
|
cmd_path = os.path.join(COMMANDS_DIR, f"{cmd_name}.py")
|
|
|
|
if os.path.exists(cmd_path):
|
|
os.remove(cmd_path)
|
|
self.loaded_commands = load_commands()
|
|
await message.reply(f"Command {cmd_name} deleted.",silent=True)
|
|
else:
|
|
await message.reply(f"Command {cmd_name} not found.",silent=True)
|
|
|
|
elif message.content.startswith(".listcmds"):
|
|
cmds = list(self.loaded_commands.keys())
|
|
await message.reply("Saved commands:\n" + ", ".join(cmds) if cmds else "No saved commands.",silent=True)
|
|
elif message.content.startswith(".delrecent "):
|
|
try:
|
|
minutes = float(message.content.split(" ", 1)[1])
|
|
cutoff_time = datetime.datetime.now(datetime.UTC) - datetime.timedelta(minutes=minutes)
|
|
|
|
deleted = 0
|
|
async for msg in message.channel.history(limit=100):
|
|
if msg.author == self.user and msg.created_at >= cutoff_time:
|
|
await msg.delete()
|
|
deleted += 1
|
|
|
|
await message.channel.send(f"Deleted {deleted} messages.", delete_after=0,silent=True)
|
|
except Exception as e:
|
|
await message.channel.send(f"Error: {e}", delete_after=0,silent=True)
|
|
elif message.content.startswith(".trackmessages"):
|
|
channel_id = message.channel.id
|
|
if channel_id not in self.tracked_channels:
|
|
self.tracked_channels.append(channel_id)
|
|
save_tracked_channels(self.tracked_channels)
|
|
await message.reply(f"Tracking messages in this channel {message.channel.name}.",silent=True)
|
|
else:
|
|
await message.reply("This channel is already being tracked.",silent=True)
|
|
|
|
elif message.content.startswith(".untrackmessages"):
|
|
channel_id = message.channel.id
|
|
if channel_id in self.tracked_channels:
|
|
self.tracked_channels.remove(channel_id)
|
|
save_tracked_channels(self.tracked_channels)
|
|
await message.reply(f"Stopped tracking messages in {message.channel.name}.",silent=True)
|
|
else:
|
|
await message.reply("This channel is not being tracked.",silent=True)
|
|
elif message.content.startswith(".savechannel"):
|
|
try:
|
|
messages = []
|
|
async for msg in message.channel.history(limit=None):
|
|
msg_dict = {
|
|
"id": msg.id,
|
|
"content": msg.content,
|
|
"created_at": msg.created_at.isoformat() if msg.created_at else None,
|
|
"edited_at": msg.edited_at.isoformat() if msg.edited_at else None,
|
|
"author": {
|
|
"id": msg.author.id,
|
|
"name": msg.author.name,
|
|
"discriminator": msg.author.discriminator,
|
|
"bot": msg.author.bot
|
|
} if msg.author else None,
|
|
"channel_id": msg.channel.id,
|
|
"attachments": [
|
|
{"filename": a.filename, "url": a.url} for a in msg.attachments
|
|
] if msg.attachments else [],
|
|
"embeds": [embed.to_dict() for embed in msg.embeds] if msg.embeds else [],
|
|
"reactions": [
|
|
{"emoji": str(r.emoji), "count": r.count} for r in msg.reactions
|
|
] if msg.reactions else [],
|
|
"mentions": [user.id for user in msg.mentions] if msg.mentions else [],
|
|
"role_mentions": [role.id for role in msg.role_mentions] if msg.role_mentions else [],
|
|
"pinned": msg.pinned,
|
|
"tts": msg.tts,
|
|
"type": str(msg.type),
|
|
"reference": {
|
|
"message_id": msg.reference.message_id,
|
|
"channel_id": msg.reference.channel_id,
|
|
"guild_id": msg.reference.guild_id
|
|
} if msg.reference else None,
|
|
}
|
|
messages.append(msg_dict)
|
|
|
|
filename = f"{message.channel.id}.json.gz"
|
|
filepath = os.path.join("downloads", filename) # Save to a "downloads" folder
|
|
os.makedirs("downloads", exist_ok=True)
|
|
|
|
with gzip.open(filepath, "wt", encoding="utf-8") as f:
|
|
json.dump(messages, f, ensure_ascii=False, indent=4)
|
|
|
|
await message.edit(content=f"Saved messages to `{filepath}`")
|
|
|
|
except Exception as e:
|
|
await message.edit(content=f"Error: {e}")
|
|
elif message.content.startswith(".repeat29"):
|
|
await message.reply("oop ok")
|
|
while 1:
|
|
await message.channel.send("you asked dad")
|
|
await asyncio.sleep(29*60)
|
|
@staticmethod
|
|
async def process_log_whitelist(message):
|
|
if message.author in [627566973869359104,]:
|
|
return
|
|
|
|
async def on_message_delete(self, message):
|
|
await self.process_log_whitelist(message)
|
|
if message.channel.id in self.tracked_channels:
|
|
member = message.author
|
|
if member != self.user:
|
|
await message.channel.send(f"<@{member.id}> deleted {message.content}",silent=True)
|
|
|
|
async def on_message_edit(self, before, after):
|
|
await self.process_log_whitelist(before)
|
|
if before.channel.id in self.tracked_channels:
|
|
member = after.author
|
|
if member == self.user: return
|
|
if before.content == after.content: return
|
|
diff = difflib.unified_diff(before.content.splitlines(), after.content.splitlines())
|
|
diff_result = '\n'.join(diff)
|
|
|
|
# Use BytesIO to create an in-memory file-like object with the full diff (no line removal)
|
|
with io.BytesIO(diff_result.encode('utf-8')) as diff_file:
|
|
diff_file.seek(0) # Ensure we're at the start of the BytesIO buffer
|
|
|
|
# Send the file to the channel
|
|
await after.channel.send(
|
|
f"<@{member.id}> edited a message",
|
|
file=discord.File(diff_file, "cutie.diff"),silent=True
|
|
)
|
|
async def on_presence_update(self, before, after):
|
|
if after.id == 627566973869359104:
|
|
old_status = self.last_status.get(after.id, discord.Status.offline)
|
|
self.last_status[after.id] = after.status
|
|
|
|
if old_status in [discord.Status.offline] and after.status == discord.Status.online:
|
|
channel = self.get_channel(1302691935152246851)
|
|
if channel:
|
|
...
|
|
#await channel.send(f"[BOT] Welcome back, {after.mention}!",silent=True)
|
|
|
|
client = Selfbot()
|
|
# get token from environment variable
|
|
TOKEN = os.getenv("TOKEN")
|
|
if not TOKEN:
|
|
raise ValueError("No TOKEN found in environment variables. Please add it to your .env file.")
|
|
client.run(TOKEN)
|