Merge remote-tracking branch 'origin/main'
This commit is contained in:
commit
a667d4f701
173
bot/cogs/cog_manager.py
Normal file
173
bot/cogs/cog_manager.py
Normal file
|
@ -0,0 +1,173 @@
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import importlib
|
||||||
|
import importlib.util
|
||||||
|
import inspect
|
||||||
|
from typing import Dict, Any, Optional
|
||||||
|
|
||||||
|
class BaseCog:
|
||||||
|
"""Base cog class that all cogs should inherit from"""
|
||||||
|
def __init__(self, bot):
|
||||||
|
self.bot = bot
|
||||||
|
self.name = self.__class__.__name__
|
||||||
|
|
||||||
|
def cleanup(self):
|
||||||
|
"""Called when cog is unloaded"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
class CogManager:
|
||||||
|
def __init__(self, bot):
|
||||||
|
self.bot = bot
|
||||||
|
self.cogs: Dict[str, BaseCog] = {}
|
||||||
|
self.cogs_dir = os.path.join("bot", "cogs")
|
||||||
|
os.makedirs(self.cogs_dir, exist_ok=True)
|
||||||
|
|
||||||
|
def load_cog(self, cog_name: str) -> bool:
|
||||||
|
"""
|
||||||
|
Load a cog by name
|
||||||
|
|
||||||
|
Args:
|
||||||
|
cog_name: Name of the cog file (without .py)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if loaded successfully, False otherwise
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# If already loaded, unload first
|
||||||
|
if cog_name in self.cogs:
|
||||||
|
self.unload_cog(cog_name)
|
||||||
|
|
||||||
|
# Get full path to cog file
|
||||||
|
cog_path = os.path.join(self.cogs_dir, f"{cog_name}.py")
|
||||||
|
if not os.path.exists(cog_path):
|
||||||
|
print(f"Cog file not found: {cog_path}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Create a unique module name to avoid conflicts
|
||||||
|
module_name = f"bot.cogs.{cog_name}"
|
||||||
|
|
||||||
|
# Load the module
|
||||||
|
spec = importlib.util.spec_from_file_location(module_name, cog_path)
|
||||||
|
if not spec:
|
||||||
|
print(f"Failed to create spec for {cog_path}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
module = importlib.util.module_from_spec(spec)
|
||||||
|
sys.modules[module_name] = module
|
||||||
|
spec.loader.exec_module(module)
|
||||||
|
|
||||||
|
# Find and instantiate the cog class
|
||||||
|
# A cog class is a class that inherits from BaseCog
|
||||||
|
cog_class = None
|
||||||
|
for name, obj in inspect.getmembers(module):
|
||||||
|
if (inspect.isclass(obj) and
|
||||||
|
issubclass(obj, BaseCog) and
|
||||||
|
obj is not BaseCog):
|
||||||
|
cog_class = obj
|
||||||
|
break
|
||||||
|
|
||||||
|
if not cog_class:
|
||||||
|
print(f"No cog class found in {cog_path}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Instantiate the cog
|
||||||
|
cog = cog_class(self.bot)
|
||||||
|
self.cogs[cog_name] = cog
|
||||||
|
|
||||||
|
# Register commands
|
||||||
|
for name, method in inspect.getmembers(cog, inspect.ismethod):
|
||||||
|
if name.startswith("cmd_"):
|
||||||
|
cmd_name = name[4:] # Remove "cmd_" prefix
|
||||||
|
self.bot.loaded_commands[cmd_name] = method
|
||||||
|
|
||||||
|
print(f"Loaded cog: {cog_name}")
|
||||||
|
return True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error loading cog {cog_name}: {e}")
|
||||||
|
import traceback
|
||||||
|
traceback.print_exc()
|
||||||
|
return False
|
||||||
|
|
||||||
|
def unload_cog(self, cog_name: str) -> bool:
|
||||||
|
"""
|
||||||
|
Unload a cog by name
|
||||||
|
|
||||||
|
Args:
|
||||||
|
cog_name: Name of the cog
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if unloaded successfully, False otherwise
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if cog_name not in self.cogs:
|
||||||
|
print(f"Cog not loaded: {cog_name}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
cog = self.cogs[cog_name]
|
||||||
|
|
||||||
|
# Call cleanup method
|
||||||
|
cog.cleanup()
|
||||||
|
|
||||||
|
# Remove commands from loaded_commands
|
||||||
|
for name, method in inspect.getmembers(cog, inspect.ismethod):
|
||||||
|
if name.startswith("cmd_"):
|
||||||
|
cmd_name = name[4:]
|
||||||
|
if cmd_name in self.bot.loaded_commands:
|
||||||
|
del self.bot.loaded_commands[cmd_name]
|
||||||
|
|
||||||
|
# Remove cog
|
||||||
|
del self.cogs[cog_name]
|
||||||
|
|
||||||
|
# Remove module
|
||||||
|
module_name = f"bot.cogs.{cog_name}"
|
||||||
|
if module_name in sys.modules:
|
||||||
|
del sys.modules[module_name]
|
||||||
|
|
||||||
|
print(f"Unloaded cog: {cog_name}")
|
||||||
|
return True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error unloading cog {cog_name}: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def reload_cog(self, cog_name: str) -> bool:
|
||||||
|
"""
|
||||||
|
Reload a cog by name
|
||||||
|
|
||||||
|
Args:
|
||||||
|
cog_name: Name of the cog
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if reloaded successfully, False otherwise
|
||||||
|
"""
|
||||||
|
return self.load_cog(cog_name)
|
||||||
|
|
||||||
|
def load_all_cogs(self) -> int:
|
||||||
|
"""
|
||||||
|
Load all cogs from the cogs directory
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
int: Number of cogs loaded successfully
|
||||||
|
"""
|
||||||
|
count = 0
|
||||||
|
for filename in os.listdir(self.cogs_dir):
|
||||||
|
if filename.endswith(".py") and not filename.startswith("_"):
|
||||||
|
cog_name = filename[:-3]
|
||||||
|
if self.load_cog(cog_name):
|
||||||
|
count += 1
|
||||||
|
return count
|
||||||
|
|
||||||
|
def unload_all_cogs(self) -> int:
|
||||||
|
"""
|
||||||
|
Unload all cogs
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
int: Number of cogs unloaded successfully
|
||||||
|
"""
|
||||||
|
cog_names = list(self.cogs.keys())
|
||||||
|
count = 0
|
||||||
|
for cog_name in cog_names:
|
||||||
|
if self.unload_cog(cog_name):
|
||||||
|
count += 1
|
||||||
|
return count
|
33
bot/cogs/example_cog.py
Normal file
33
bot/cogs/example_cog.py
Normal file
|
@ -0,0 +1,33 @@
|
||||||
|
from bot.cogs.cog_manager import BaseCog
|
||||||
|
import discord
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
class ExampleCog(BaseCog):
|
||||||
|
def __init__(self, bot):
|
||||||
|
super().__init__(bot)
|
||||||
|
# Initialize any state for this cog
|
||||||
|
self.counter = 0
|
||||||
|
|
||||||
|
def cleanup(self):
|
||||||
|
# Clean up any resources when unloaded
|
||||||
|
print("Example cog is being unloaded and cleaned up")
|
||||||
|
|
||||||
|
async def cmd_hello(self, message):
|
||||||
|
"""
|
||||||
|
A simple hello command
|
||||||
|
Usage: .hello
|
||||||
|
"""
|
||||||
|
self.counter += 1
|
||||||
|
await message.edit(content=f"👋 Hello! I've been greeted {self.counter} times.")
|
||||||
|
|
||||||
|
async def cmd_echo(self, message):
|
||||||
|
"""
|
||||||
|
Echo the user's message
|
||||||
|
Usage: .echo <message>
|
||||||
|
"""
|
||||||
|
content = message.content
|
||||||
|
if ' ' in content:
|
||||||
|
text = content.split(' ', 1)[1]
|
||||||
|
await message.edit(content=f"🔊 {text}")
|
||||||
|
else:
|
||||||
|
await message.edit(content="❌ Usage: `.echo <message>`")
|
|
@ -62,4 +62,70 @@ class AdminCommands:
|
||||||
save_tracked_channels(self.bot.tracked_channels)
|
save_tracked_channels(self.bot.tracked_channels)
|
||||||
await message.reply(f"Stopped tracking messages in {message.channel.name}.", silent=True)
|
await message.reply(f"Stopped tracking messages in {message.channel.name}.", silent=True)
|
||||||
else:
|
else:
|
||||||
await message.reply("This channel is not being tracked.", silent=True)
|
await message.reply("This channel is not being tracked.", silent=True)
|
||||||
|
|
||||||
|
async def cmd_loadcog(self, message):
|
||||||
|
"""
|
||||||
|
Load a cog
|
||||||
|
Usage: .loadcog <cog_name>
|
||||||
|
"""
|
||||||
|
content = message.content.strip()
|
||||||
|
parts = content.split()
|
||||||
|
|
||||||
|
if len(parts) != 2:
|
||||||
|
await message.edit(content="❌ Usage: `.loadcog <cog_name>`")
|
||||||
|
return
|
||||||
|
|
||||||
|
cog_name = parts[1]
|
||||||
|
if self.bot.cog_manager.load_cog(cog_name):
|
||||||
|
await message.edit(content=f"✅ Loaded cog: `{cog_name}`")
|
||||||
|
else:
|
||||||
|
await message.edit(content=f"❌ Failed to load cog: `{cog_name}`")
|
||||||
|
|
||||||
|
async def cmd_unloadcog(self, message):
|
||||||
|
"""
|
||||||
|
Unload a cog
|
||||||
|
Usage: .unloadcog <cog_name>
|
||||||
|
"""
|
||||||
|
content = message.content.strip()
|
||||||
|
parts = content.split()
|
||||||
|
|
||||||
|
if len(parts) != 2:
|
||||||
|
await message.edit(content="❌ Usage: `.unloadcog <cog_name>`")
|
||||||
|
return
|
||||||
|
|
||||||
|
cog_name = parts[1]
|
||||||
|
if self.bot.cog_manager.unload_cog(cog_name):
|
||||||
|
await message.edit(content=f"✅ Unloaded cog: `{cog_name}`")
|
||||||
|
else:
|
||||||
|
await message.edit(content=f"❌ Failed to unload cog: `{cog_name}`")
|
||||||
|
|
||||||
|
async def cmd_reloadcog(self, message):
|
||||||
|
"""
|
||||||
|
Reload a cog
|
||||||
|
Usage: .reloadcog <cog_name>
|
||||||
|
"""
|
||||||
|
content = message.content.strip()
|
||||||
|
parts = content.split()
|
||||||
|
|
||||||
|
if len(parts) != 2:
|
||||||
|
await message.edit(content="❌ Usage: `.reloadcog <cog_name>`")
|
||||||
|
return
|
||||||
|
|
||||||
|
cog_name = parts[1]
|
||||||
|
if self.bot.cog_manager.reload_cog(cog_name):
|
||||||
|
await message.edit(content=f"✅ Reloaded cog: `{cog_name}`")
|
||||||
|
else:
|
||||||
|
await message.edit(content=f"❌ Failed to reload cog: `{cog_name}`")
|
||||||
|
|
||||||
|
async def cmd_listcogs(self, message):
|
||||||
|
"""
|
||||||
|
List all loaded cogs
|
||||||
|
Usage: .listcogs
|
||||||
|
"""
|
||||||
|
if not self.bot.cog_manager.cogs:
|
||||||
|
await message.edit(content="No cogs are currently loaded.")
|
||||||
|
return
|
||||||
|
|
||||||
|
cog_list = "\n".join(f"• {name}" for name in sorted(self.bot.cog_manager.cogs.keys()))
|
||||||
|
await message.edit(content=f"**Loaded Cogs:**\n{cog_list}")
|
|
@ -4,37 +4,6 @@ import asyncio
|
||||||
class UserManagementCommands:
|
class UserManagementCommands:
|
||||||
def __init__(self, bot):
|
def __init__(self, bot):
|
||||||
self.bot = bot
|
self.bot = bot
|
||||||
# Store lists of ignored/muted users and channels
|
|
||||||
self.ignored_channels = set()
|
|
||||||
self.muted_channels = set()
|
|
||||||
self.muted_users = set()
|
|
||||||
|
|
||||||
async def cmd_ignore(self, message):
|
|
||||||
"""
|
|
||||||
Ignore all users in the current channel except for self
|
|
||||||
Usage: .ignore
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
channel_id = message.channel.id
|
|
||||||
self.ignored_channels.add(channel_id)
|
|
||||||
await message.edit(content=f"✅ Ignoring users in this channel. Use `.unignore` to revert.")
|
|
||||||
except Exception as e:
|
|
||||||
await message.edit(content=f"❌ Error ignoring channel: {str(e)}")
|
|
||||||
|
|
||||||
async def cmd_unignore(self, message):
|
|
||||||
"""
|
|
||||||
Stop ignoring users in the current channel
|
|
||||||
Usage: .unignore
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
channel_id = message.channel.id
|
|
||||||
if channel_id in self.ignored_channels:
|
|
||||||
self.ignored_channels.remove(channel_id)
|
|
||||||
await message.edit(content=f"✅ No longer ignoring users in this channel.")
|
|
||||||
else:
|
|
||||||
await message.edit(content=f"❌ This channel is not being ignored.")
|
|
||||||
except Exception as e:
|
|
||||||
await message.edit(content=f"❌ Error: {str(e)}")
|
|
||||||
|
|
||||||
async def cmd_close(self, message):
|
async def cmd_close(self, message):
|
||||||
"""
|
"""
|
||||||
|
@ -52,36 +21,9 @@ class UserManagementCommands:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
await message.edit(content=f"❌ Error closing DM: {str(e)}")
|
await message.edit(content=f"❌ Error closing DM: {str(e)}")
|
||||||
|
|
||||||
async def cmd_mute(self, message):
|
|
||||||
"""
|
|
||||||
Mute the current channel (no notifications)
|
|
||||||
Usage: .mute
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
channel_id = message.channel.id
|
|
||||||
self.muted_channels.add(channel_id)
|
|
||||||
await message.edit(content=f"✅ Channel muted. Use `.unmute` to revert.")
|
|
||||||
except Exception as e:
|
|
||||||
await message.edit(content=f"❌ Error muting channel: {str(e)}")
|
|
||||||
|
|
||||||
async def cmd_unmute(self, message):
|
|
||||||
"""
|
|
||||||
Unmute the current channel
|
|
||||||
Usage: .unmute
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
channel_id = message.channel.id
|
|
||||||
if channel_id in self.muted_channels:
|
|
||||||
self.muted_channels.remove(channel_id)
|
|
||||||
await message.edit(content=f"✅ Channel unmuted.")
|
|
||||||
else:
|
|
||||||
await message.edit(content=f"❌ This channel is not muted.")
|
|
||||||
except Exception as e:
|
|
||||||
await message.edit(content=f"❌ Error: {str(e)}")
|
|
||||||
|
|
||||||
async def cmd_block(self, message):
|
async def cmd_block(self, message):
|
||||||
"""
|
"""
|
||||||
Block a user
|
Block a user using Discord's native block function
|
||||||
Usage: .block @user or .block [in reply to a message]
|
Usage: .block @user or .block [in reply to a message]
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
|
@ -99,7 +41,7 @@ class UserManagementCommands:
|
||||||
return
|
return
|
||||||
|
|
||||||
await user.block()
|
await user.block()
|
||||||
await message.edit(content=f"✅ Blocked user {user.name}#{user.discriminator}.")
|
await message.edit(content=f"✅ Blocked {user.name}#{user.discriminator}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
await message.edit(content=f"❌ Error blocking user: {str(e)}")
|
await message.edit(content=f"❌ Error blocking user: {str(e)}")
|
||||||
|
|
||||||
|
@ -139,4 +81,4 @@ class UserManagementCommands:
|
||||||
|
|
||||||
# Method to check if a channel is muted
|
# Method to check if a channel is muted
|
||||||
def is_channel_muted(self, channel_id):
|
def is_channel_muted(self, channel_id):
|
||||||
return channel_id in self.muted_channels
|
return channel_id in self.muted_channels
|
||||||
|
|
|
@ -80,12 +80,6 @@ class MessageHandler:
|
||||||
if message.author.bot:
|
if message.author.bot:
|
||||||
return
|
return
|
||||||
|
|
||||||
# Check if the channel is in ignored channels list and message is not from bot user
|
|
||||||
if (message.author != self.bot.user and
|
|
||||||
hasattr(self, 'user_management_commands') and
|
|
||||||
self.user_management_commands.is_channel_ignored(message.channel.id)):
|
|
||||||
return
|
|
||||||
|
|
||||||
# Look for and replace time patterns (only for non-command messages)
|
# Look for and replace time patterns (only for non-command messages)
|
||||||
if not message.content.startswith('.'):
|
if not message.content.startswith('.'):
|
||||||
original_content = message.content
|
original_content = message.content
|
||||||
|
@ -140,32 +134,26 @@ class MessageHandler:
|
||||||
"""Handle commands issued by the bot user"""
|
"""Handle commands issued by the bot user"""
|
||||||
content = message.content
|
content = message.content
|
||||||
|
|
||||||
# Check for custom commands first
|
# Skip if not a command
|
||||||
if content.startswith('.'):
|
if not content.startswith('.'):
|
||||||
cmd = content.split()[0][1:] # Remove the '.' and get the command name
|
return
|
||||||
if hasattr(self.bot, 'loaded_commands') and cmd in self.bot.loaded_commands:
|
|
||||||
# Execute the custom command
|
|
||||||
try:
|
|
||||||
# Extract arguments from the command
|
|
||||||
args = content.split(' ')[1:] if ' ' in content else []
|
|
||||||
await self.bot.loaded_commands[cmd](self.bot, message, args)
|
|
||||||
return
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error executing custom command {cmd}: {e}")
|
|
||||||
await message.channel.send(f"Error executing command: {e}")
|
|
||||||
return
|
|
||||||
|
|
||||||
# User Management Commands
|
cmd_parts = content.split()
|
||||||
if content.startswith(".ignore"):
|
cmd = cmd_parts[0][1:] # Get command name without the '.'
|
||||||
await self.user_management_commands.cmd_ignore(message)
|
|
||||||
elif content.startswith(".unignore"):
|
# Check for custom/loaded commands first
|
||||||
await self.user_management_commands.cmd_unignore(message)
|
if hasattr(self.bot, 'loaded_commands') and cmd in self.bot.loaded_commands:
|
||||||
elif content.startswith(".close"):
|
try:
|
||||||
|
await self.bot.loaded_commands[cmd](message)
|
||||||
|
return
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error executing command {cmd}: {e}")
|
||||||
|
await message.edit(content=f"❌ Error executing command: {e}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# User Management Commands - only keeping close and block
|
||||||
|
if content.startswith(".close"):
|
||||||
await self.user_management_commands.cmd_close(message)
|
await self.user_management_commands.cmd_close(message)
|
||||||
elif content.startswith(".mute"):
|
|
||||||
await self.user_management_commands.cmd_mute(message)
|
|
||||||
elif content.startswith(".unmute"):
|
|
||||||
await self.user_management_commands.cmd_unmute(message)
|
|
||||||
elif content.startswith(".block"):
|
elif content.startswith(".block"):
|
||||||
await self.user_management_commands.cmd_block(message)
|
await self.user_management_commands.cmd_block(message)
|
||||||
elif content.startswith(".unblock"):
|
elif content.startswith(".unblock"):
|
||||||
|
|
|
@ -21,17 +21,19 @@ class Selfbot(discord.Client):
|
||||||
self.tracking_handler = TrackingHandler(self)
|
self.tracking_handler = TrackingHandler(self)
|
||||||
self.presence_handler = PresenceHandler(self)
|
self.presence_handler = PresenceHandler(self)
|
||||||
|
|
||||||
|
# Add to SelfBot.__init__
|
||||||
|
from bot.cogs.cog_manager import CogManager
|
||||||
|
|
||||||
|
# In the __init__ method:
|
||||||
|
self.cog_manager = CogManager(self)
|
||||||
|
self.cog_manager.load_all_cogs()
|
||||||
|
|
||||||
async def on_ready(self):
|
async def on_ready(self):
|
||||||
print(f"Logged in as {self.user}")
|
print(f"Logged in as {self.user}")
|
||||||
|
|
||||||
async def on_message(self, message):
|
async def on_message(self, message):
|
||||||
# Skip notifications for muted channels
|
# Don't use muted_channels anymore since we're using Discord's native functionality
|
||||||
if (message.channel.id in self.user_management_commands.muted_channels and
|
# Instead, just process all messages
|
||||||
message.author != self.user):
|
|
||||||
# Still process the message, but don't trigger notifications
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Regular message handling
|
|
||||||
await self.message_handler.handle_message(message)
|
await self.message_handler.handle_message(message)
|
||||||
|
|
||||||
async def on_message_delete(self, message):
|
async def on_message_delete(self, message):
|
||||||
|
|
Loading…
Reference in a new issue