selfbot-legacy/app.py

403 lines
17 KiB
Python
Raw Normal View History

2025-05-07 16:44:50 +02:00
import random
import discord
import asyncio
import traceback
import random
import datetime
import os
import importlib.util
import ast
import websockets
import json
import difflib
import io
import gzip
import re
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
time_regex = re.compile(r'(\d+)([smhd])') # Matches 4m2s, 1h30m, etc.
def parse_time(time_str):
units = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}
total_seconds = sum(int(amount) * units[unit] for amount, unit in time_regex.findall(time_str))
return total_seconds if total_seconds > 0 else None
COMMANDS_DIR = "commands"
os.makedirs(COMMANDS_DIR, exist_ok=True)
def load_commands():
commands = {}
for filename in os.listdir(COMMANDS_DIR):
if filename.endswith(".py"):
cmd_name = filename[:-3]
cmd_path = os.path.join(COMMANDS_DIR, filename)
spec = importlib.util.spec_from_file_location(cmd_name, cmd_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
if hasattr(module, "run") and callable(module.run):
commands[cmd_name] = module.run
return commands
# Add a tracking file to save channel IDs
TRACKED_CHANNELS_FILE = "tracked_channels.json"
def load_tracked_channels():
if os.path.exists(TRACKED_CHANNELS_FILE):
with open(TRACKED_CHANNELS_FILE, 'r') as f:
return json.load(f)
return []
def save_tracked_channels(tracked_channels):
with open(TRACKED_CHANNELS_FILE, 'w') as f:
json.dump(tracked_channels, f)
async def handle_blacklist(message):
if message.author in [696800726084747314]:
await message.reply("no")
class Selfbot(discord.Client):
def __init__(self):
super().__init__()
self.default_status = None
self.loaded_commands = load_commands()
self.tracked_channels = load_tracked_channels()
self.last_status = {}
self.AFK_STATUS = False # Static variable to track AFK status
self.AFK_NOTIFIED_USERS = [] # Static list to store users notified of AFK status
self.horsin = []
async def on_ready(self):
print(f"Logged in as {self.user}")
async def handle_afk_dm(self,message):
if self.AFK_STATUS and message.author.id not in self.AFK_NOTIFIED_USERS:
await message.reply(
"Heya, I'm not at my computer right now, if you're requesting something please follow <https://nohello.club>. I'll let you know when I'm back :) \n\n-# This action was automated."
)
self.AFK_NOTIFIED_USERS.append(message.author.id)
async def on_message(self, message):
if message.author.id == 1169111190824308768 and "<@1236667927944761396>" in message.content:
await message.reply("shut the fuck up")
if message.content.startswith(".remindme "):
await handle_blacklist(message)
parts = message.content.split(" ", 2)
if len(parts) < 3:
await message.reply("Usage: `.remindme <time> <message>`", silent=True)
else:
duration = parse_time(parts[1])
if duration is None:
await message.reply("Invalid time format. Example: `4m2s`, `1h30m`.", silent=True)
else:
reminder_text = parts[2]
await message.reply(f"Reminder set for {parts[1]}.", silent=True)
async def reminder_task():
await asyncio.sleep(duration)
await message.reply(f"{message.author.mention} Reminder: {reminder_text.replace("@", "at")}")
asyncio.create_task(reminder_task())
elif message.content.startswith(".rps "):
await handle_blacklist(message)
parts = message.content.split(" ",2)
if len(parts) != 2:
await message.reply("Usage: `.rps <item`",silent=True)
item
item = parts[1]
if item.lower() == "dick":
await message.reply("Scissors beats dick any day :3",silent=True) # little easter egg
return
if item == "<@696800726084747314>":
await message.reply("Head so thick that i would try rock but the rock would break")
return
choice = random.choice([1,2,3])
rps_map = dict(zip((1,2,3),[i for i in "rps"]))
rps_map_reverse = dict(zip([i for i in "rps"],(1,2,3))) # FIXME: if you cant see the issue you're blind
shortmaps = {word[0]: word for word in ["rock", "paper", "scissors"]}
beat_map = {1:3,2:1,3:2}
iid = 0
for k,v in rps_map_reverse.items():
if item.lower().startswith(k):
iid = v
break
if iid == 0:
await message.reply("Invalid choice!",silent=True)
return
if choice == iid:
await message.reply(f"Huh we chose the same thing. Try again!",silent=True)
return
if beat_map[iid] == choice:
await message.reply(f"Welp, ggs, I won. I chose `{shortmaps[rps_map[choice]]}`",silent=True)
else:
await message.reply(f"Oop, you lost. Try again later! I chose `{shortmaps[rps_map[choice]]}`",silent=True)
# Handle DM if in AFK mode
if isinstance(message.channel, discord.DMChannel) and message.author != self.user:
await self.handle_afk_dm(message)
if message.channel.id in self.horsin:
await message.add_reaction("🐴")
if message.author.id == 1341423498618208257:
await message.add_reaction("🪣")
if message.author != self.user:
if message.author.id == 1351739454141763626:
try:
await message.delete()
except:
pass
return
if message.content.startswith(".horse"):
if message.channel.id in self.horsin:
self.horsin.remove(message.channel.id)
await message.reply("no longer horsin around D:")
else:
self.horsin.append(message.channel.id)
await message.reply(":D")
if message.content.startswith(".afk"):
if not self.AFK_STATUS:
self.AFK_STATUS = True
await message.reply("k")
else:
await message.reply("You are already in AFK mode.", silent=True)
# UNAFK Command
elif message.content.startswith(".unafk"):
if self.AFK_STATUS:
self.AFK_STATUS = False
for i in self.AFK_NOTIFIED_USERS:
try:
user = await self.fetch_user(i) # Fetch user by ID
if user:
dm_channel = await user.create_dm() # Create DM channel
await dm_channel.send("Hey, I'm back, human me will take over now!")
except Exception as e:
raise RuntimeWarning from e
self.AFK_NOTIFIED_USERS.clear() # Clear the AFK notified users list
await message.reply("should work")
else:
await message.reply("You are not in AFK mode.", silent=True)
if message.content.startswith(".fmt "):
try:
formated = eval(f"f'{message.content[5:]}'", globals(),locals())
print(formated)
await asyncio.wait_for(message.edit(formated), timeout=5)
except Exception as e:
flashdel = await message.channel.send(f"Error: {e}",silent=True)
await message.delete()
await flashdel.delete()
raise RuntimeWarning from e
elif message.content.startswith(".eval "):
try:
formatted = message.content[6:]
print(repr(formatted))
exec_scope = {
"msg": message,
"asyncio": asyncio,
"random": random,
**self.loaded_commands, # Inject loaded commands
"out": lambda content: message.reply(content, silent=True),
}
# Parse the code to detect non-async function calls
tree = ast.parse(formatted)
for node in ast.walk(tree):
if isinstance(node, ast.Call) and not isinstance(node.func, ast.Attribute):
# Check if the function is a coroutine
func_name = node.func.id
if func_name in exec_scope and asyncio.iscoroutinefunction(exec_scope[func_name]):
# Replace the call with an await expression
formatted = formatted.replace(f"{func_name}(", f"await {func_name}(")
exec(f"async def __eval():\n {formatted.replace(chr(10), chr(10) + ' ')}", exec_scope)
result = await exec_scope["__eval"]()
except Exception:
await message.edit(content=traceback.format_exc())
finally:
await message.delete()
elif message.content.startswith(".addcmd "):
try:
parts = message.content.split(" ", 2)
if len(parts) < 3:
await message.reply("Usage: .addcmd <name> <code>",silent=True)
return
cmd_name, code = parts[1], parts[2]
cmd_path = os.path.join(COMMANDS_DIR, f"{cmd_name}.py")
with open(cmd_path, "w") as f:
f.write("async def run(msg):\n")
for line in code.split("\n"):
f.write(f" {line}\n")
self.loaded_commands = load_commands()
await message.reply(f"Command {cmd_name} saved.",silent=True)
except Exception as e:
await message.reply(f"Error: {e}",silent=True)
elif message.content.startswith(".delcmd "):
cmd_name = message.content.split(" ", 1)[1]
cmd_path = os.path.join(COMMANDS_DIR, f"{cmd_name}.py")
if os.path.exists(cmd_path):
os.remove(cmd_path)
self.loaded_commands = load_commands()
await message.reply(f"Command {cmd_name} deleted.",silent=True)
else:
await message.reply(f"Command {cmd_name} not found.",silent=True)
elif message.content.startswith(".listcmds"):
cmds = list(self.loaded_commands.keys())
await message.reply("Saved commands:\n" + ", ".join(cmds) if cmds else "No saved commands.",silent=True)
elif message.content.startswith(".delrecent "):
try:
minutes = float(message.content.split(" ", 1)[1])
cutoff_time = datetime.datetime.now(datetime.UTC) - datetime.timedelta(minutes=minutes)
deleted = 0
async for msg in message.channel.history(limit=100):
if msg.author == self.user and msg.created_at >= cutoff_time:
await msg.delete()
deleted += 1
await message.channel.send(f"Deleted {deleted} messages.", delete_after=0,silent=True)
except Exception as e:
await message.channel.send(f"Error: {e}", delete_after=0,silent=True)
elif message.content.startswith(".trackmessages"):
channel_id = message.channel.id
if channel_id not in self.tracked_channels:
self.tracked_channels.append(channel_id)
save_tracked_channels(self.tracked_channels)
await message.reply(f"Tracking messages in this channel {message.channel.name}.",silent=True)
else:
await message.reply("This channel is already being tracked.",silent=True)
elif message.content.startswith(".untrackmessages"):
channel_id = message.channel.id
if channel_id in self.tracked_channels:
self.tracked_channels.remove(channel_id)
save_tracked_channels(self.tracked_channels)
await message.reply(f"Stopped tracking messages in {message.channel.name}.",silent=True)
else:
await message.reply("This channel is not being tracked.",silent=True)
elif message.content.startswith(".savechannel"):
try:
messages = []
async for msg in message.channel.history(limit=None):
msg_dict = {
"id": msg.id,
"content": msg.content,
"created_at": msg.created_at.isoformat() if msg.created_at else None,
"edited_at": msg.edited_at.isoformat() if msg.edited_at else None,
"author": {
"id": msg.author.id,
"name": msg.author.name,
"discriminator": msg.author.discriminator,
"bot": msg.author.bot
} if msg.author else None,
"channel_id": msg.channel.id,
"attachments": [
{"filename": a.filename, "url": a.url} for a in msg.attachments
] if msg.attachments else [],
"embeds": [embed.to_dict() for embed in msg.embeds] if msg.embeds else [],
"reactions": [
{"emoji": str(r.emoji), "count": r.count} for r in msg.reactions
] if msg.reactions else [],
"mentions": [user.id for user in msg.mentions] if msg.mentions else [],
"role_mentions": [role.id for role in msg.role_mentions] if msg.role_mentions else [],
"pinned": msg.pinned,
"tts": msg.tts,
"type": str(msg.type),
"reference": {
"message_id": msg.reference.message_id,
"channel_id": msg.reference.channel_id,
"guild_id": msg.reference.guild_id
} if msg.reference else None,
}
messages.append(msg_dict)
filename = f"{message.channel.id}.json.gz"
filepath = os.path.join("downloads", filename) # Save to a "downloads" folder
os.makedirs("downloads", exist_ok=True)
with gzip.open(filepath, "wt", encoding="utf-8") as f:
json.dump(messages, f, ensure_ascii=False, indent=4)
await message.edit(content=f"Saved messages to `{filepath}`")
except Exception as e:
await message.edit(content=f"Error: {e}")
elif message.content.startswith(".repeat29"):
await message.reply("oop ok")
while 1:
await message.channel.send("you asked dad")
await asyncio.sleep(29*60)
@staticmethod
async def process_log_whitelist(message):
if message.author in [627566973869359104,]:
return
async def on_message_delete(self, message):
await self.process_log_whitelist(message)
if message.channel.id in self.tracked_channels:
member = message.author
if member != self.user:
await message.channel.send(f"<@{member.id}> deleted {message.content}",silent=True)
async def on_message_edit(self, before, after):
await self.process_log_whitelist(before)
if before.channel.id in self.tracked_channels:
member = after.author
if member == self.user: return
if before.content == after.content: return
diff = difflib.unified_diff(before.content.splitlines(), after.content.splitlines())
diff_result = '\n'.join(diff)
# Use BytesIO to create an in-memory file-like object with the full diff (no line removal)
with io.BytesIO(diff_result.encode('utf-8')) as diff_file:
diff_file.seek(0) # Ensure we're at the start of the BytesIO buffer
# Send the file to the channel
await after.channel.send(
f"<@{member.id}> edited a message",
file=discord.File(diff_file, "cutie.diff"),silent=True
)
async def on_presence_update(self, before, after):
if after.id == 627566973869359104:
old_status = self.last_status.get(after.id, discord.Status.offline)
self.last_status[after.id] = after.status
if old_status in [discord.Status.offline] and after.status == discord.Status.online:
channel = self.get_channel(1302691935152246851)
if channel:
...
#await channel.send(f"[BOT] Welcome back, {after.mention}!",silent=True)
client = Selfbot()
# get token from environment variable
TOKEN = os.getenv("TOKEN")
if not TOKEN:
raise ValueError("No TOKEN found in environment variables. Please add it to your .env file.")
client.run(TOKEN)