selfbot-legacy/bot/commands/test_commands.py

430 lines
16 KiB
Python
Raw Normal View History

2025-05-07 16:44:50 +02:00
import asyncio
import time
import inspect
import traceback
from datetime import datetime
class TestCommands:
def __init__(self, bot):
self.bot = bot
self.tests = {
"afk": self.test_afk,
"remindme": self.test_remindme,
"commands": self.test_commands,
"storage": self.test_storage,
"health": self.test_health,
"all": self.test_all,
}
async def cmd_test(self, message):
"""Run self-tests on the bot"""
parts = message.content.split(" ", 1)
test_name = parts[1] if len(parts) > 1 else "all"
if test_name not in self.tests:
await message.channel.send(
f"Unknown test '{test_name}'. Available tests: {', '.join(self.tests.keys())}"
)
return
# Create a status message without silent flag for better visibility
status_msg = await message.channel.send(f"🔄 Running test: {test_name}...")
start_time = time.time()
try:
if test_name == "all":
# For "all" tests, update status more frequently
await status_msg.edit(content="🔄 Initializing test suite...")
results = await self.test_all(status_msg)
else:
test_func = self.tests[test_name]
results = await test_func(status_msg)
elapsed = time.time() - start_time
# Format results into a nice report
report = self._format_test_report(results, elapsed)
# Make sure report isn't too long for Discord
if len(report) > 2000:
report = report[:1997] + "..."
# Update status with results
await status_msg.edit(content=report)
except Exception as e:
error_msg = f"❌ Test failed with error:\n\n{traceback.format_exc()[:1500]}\n"
print(f"Test error: {str(e)}")
await status_msg.edit(content=error_msg)
def _format_test_report(self, results, elapsed):
"""Format test results into a readable report"""
if isinstance(results, dict):
# We have multiple test suites
total_passed = sum(r['passed'] for r in results.values())
total_failed = sum(r['failed'] for r in results.values())
total_tests = total_passed + total_failed
report = f"# Self-Test Report ({elapsed:.2f}s)\n\n"
report += f"✅ **{total_passed}/{total_tests}** tests passed\n"
if total_failed > 0:
report += f"❌ **{total_failed}** tests failed\n\n"
else:
report += "\n"
# Add individual test suite results
for suite_name, suite_result in results.items():
passed = suite_result['passed']
failed = suite_result['failed']
total = passed + failed
status = "" if failed == 0 else "⚠️"
report += f"{status} **{suite_name}**: {passed}/{total} passed\n"
# Add failure details if any
if failed > 0 and 'failures' in suite_result:
report += "\n"
for failure in suite_result['failures']:
report += f"{failure}\n"
report += "\n"
else:
# Single test suite
passed = results['passed']
failed = results['failed']
total = passed + failed
report = f"# Test Results ({elapsed:.2f}s)\n\n"
report += f"✅ **{passed}/{total}** tests passed\n"
if failed > 0:
report += f"❌ **{failed}** tests failed\n\n"
report += "\n"
for failure in results.get('failures', []):
report += f"{failure}\n"
report += "\n"
return report
async def test_all(self, status_msg):
"""Run all available tests"""
results = {}
test_funcs = [v for k, v in self.tests.items() if k != "all"]
for i, test_func in enumerate(test_funcs):
test_name = test_func.__name__.replace('test_', '')
try:
# Update status before each test
await status_msg.edit(content=f"🔄 Running tests ({i+1}/{len(test_funcs)}): {test_name}...")
results[test_name] = await test_func(status_msg)
# Quick status after each test
passed = results[test_name]['passed']
failed = results[test_name]['failed']
await status_msg.edit(content=f"🔄 Test {test_name}: ✅{passed}{failed} | Continuing tests...")
except Exception as e:
results[test_name] = {'passed': 0, 'failed': 1, 'failures': [f"Exception: {str(e)}"]}
await status_msg.edit(content=f"⚠️ Error in test {test_name}, continuing with next test...")
return results
async def test_afk(self, status_msg):
"""Test AFK functionality"""
results = {'passed': 0, 'failed': 0, 'failures': []}
# Save original state
original_afk = self.bot.AFK_STATUS
original_notified = self.bot.AFK_NOTIFIED_USERS.copy()
try:
# Test 1: Enable AFK
self.bot.AFK_STATUS = False
await self.bot.afk_commands.cmd_afk(status_msg)
if self.bot.AFK_STATUS:
results['passed'] += 1
else:
results['failed'] += 1
results['failures'].append("Failed to enable AFK mode")
# Test 2: Disable AFK
await self.bot.afk_commands.cmd_unafk(status_msg)
if not self.bot.AFK_STATUS:
results['passed'] += 1
else:
results['failed'] += 1
results['failures'].append("Failed to disable AFK mode")
# Test 3: AFK notification list clears
if len(self.bot.AFK_NOTIFIED_USERS) == 0:
results['passed'] += 1
else:
results['failed'] += 1
results['failures'].append("AFK notified users list not cleared")
finally:
# Restore original state
self.bot.AFK_STATUS = original_afk
self.bot.AFK_NOTIFIED_USERS = original_notified
return results
async def test_remindme(self, status_msg):
"""Test reminder functionality with a very short reminder"""
results = {'passed': 0, 'failed': 0, 'failures': []}
# Create a test reminder message
test_content = ".remindme 1s Test reminder"
mock_message = MockMessage(
author=self.bot.user,
content=test_content,
channel=status_msg.channel
)
# Set up a flag to verify the reminder was triggered
reminder_triggered = False
original_reply = mock_message.reply
async def mock_reply(content, **kwargs):
nonlocal reminder_triggered
if "Reminder:" in content:
reminder_triggered = True
return await original_reply(content, **kwargs)
mock_message.reply = mock_reply
try:
# Test reminder setup
await self.bot.utility_commands.cmd_remindme(mock_message)
# Wait for the reminder to trigger (slightly more than 1s)
await asyncio.sleep(1.5)
if reminder_triggered:
results['passed'] += 1
else:
results['failed'] += 1
results['failures'].append("Reminder did not trigger")
except Exception as e:
results['failed'] += 1
results['failures'].append(f"Reminder test error: {str(e)}")
return results
async def test_commands(self, status_msg):
"""Test custom command functionality"""
results = {'passed': 0, 'failed': 0, 'failures': []}
# Test command name
test_cmd_name = "__test_cmd__"
test_cmd_path = f"commands/{test_cmd_name}.py"
try:
# Test 1: Add a command
add_cmd_msg = MockMessage(
author=self.bot.user,
content=f".addcmd {test_cmd_name} return await msg.reply('test success', silent=True)",
channel=status_msg.channel
)
await self.bot.admin_commands.cmd_addcmd(add_cmd_msg)
if test_cmd_name in self.bot.loaded_commands:
results['passed'] += 1
else:
results['failed'] += 1
results['failures'].append("Failed to add test command")
# Test 2: Execute the command
if test_cmd_name in self.bot.loaded_commands:
try:
test_msg = MockMessage(
author=self.bot.user,
content="test content",
channel=status_msg.channel
)
reply_received = False
original_reply = test_msg.reply
async def test_reply(content, **kwargs):
nonlocal reply_received
if content == "test success":
reply_received = True
return await original_reply(content, **kwargs)
test_msg.reply = test_reply
await self.bot.loaded_commands[test_cmd_name](test_msg)
if reply_received:
results['passed'] += 1
else:
results['failed'] += 1
results['failures'].append("Command execution failed")
except Exception as e:
results['failed'] += 1
results['failures'].append(f"Command execution error: {str(e)}")
# Test 3: Delete the command
del_cmd_msg = MockMessage(
author=self.bot.user,
content=f".delcmd {test_cmd_name}",
channel=status_msg.channel
)
await self.bot.admin_commands.cmd_delcmd(del_cmd_msg)
if test_cmd_name not in self.bot.loaded_commands:
results['passed'] += 1
else:
results['failed'] += 1
results['failures'].append("Failed to delete test command")
except Exception as e:
results['failed'] += 1
results['failures'].append(f"Command test error: {str(e)}")
# Clean up any leftovers
import os
if os.path.exists(test_cmd_path):
try:
os.remove(test_cmd_path)
self.bot.reload_commands()
except:
pass
return results
async def test_storage(self, status_msg):
"""Test channel tracking functionality"""
results = {'passed': 0, 'failed': 0, 'failures': []}
# Save original state
original_tracked = self.bot.tracked_channels.copy()
try:
# Test channel ID that likely doesn't exist
test_channel_id = 1349095349905788968
# Make sure it's not in the tracked channels
if test_channel_id in self.bot.tracked_channels:
self.bot.tracked_channels.remove(test_channel_id)
# Test 1: Add a tracked channel
self.bot.tracked_channels.append(test_channel_id)
from utils.storage import save_tracked_channels
save_tracked_channels(self.bot.tracked_channels)
# Test 2: Load tracked channels
from utils.storage import load_tracked_channels
loaded_channels = load_tracked_channels()
if test_channel_id in loaded_channels:
results['passed'] += 1
else:
results['failed'] += 1
results['failures'].append("Failed to save/load tracked channel")
# Test 3: Remove tracked channel
if test_channel_id in self.bot.tracked_channels:
self.bot.tracked_channels.remove(test_channel_id)
save_tracked_channels(self.bot.tracked_channels)
loaded_channels = load_tracked_channels()
if test_channel_id not in loaded_channels:
results['passed'] += 1
else:
results['failed'] += 1
results['failures'].append("Failed to remove tracked channel")
finally:
# Restore original state
self.bot.tracked_channels = original_tracked
from utils.storage import save_tracked_channels
save_tracked_channels(self.bot.tracked_channels)
return results
async def test_health(self, status_msg):
"""Test basic bot health and connectivity"""
results = {'passed': 0, 'failed': 0, 'failures': []}
# Test 1: Check if the bot is logged in
if self.bot.user is not None:
results['passed'] += 1
else:
results['failed'] += 1
results['failures'].append("Bot is not logged in")
# Test 2: Check discord API connection by getting client latency
try:
latency = self.bot.latency
if isinstance(latency, float):
results['passed'] += 1
else:
results['failed'] += 1
results['failures'].append(f"Invalid latency value: {latency}")
except Exception as e:
results['failed'] += 1
results['failures'].append(f"Failed to get latency: {str(e)}")
# Test 3: Test message sending/editing (core functionality)
try:
test_msg = await status_msg.channel.send(
"Test message - will be deleted", silent=False
)
await test_msg.edit(content="Test message edited - will be deleted")
await test_msg.delete()
results['passed'] += 1
except Exception as e:
results['failed'] += 1
results['failures'].append(f"Message operations failed: {str(e)}")
return results
def _assert(self, results, condition, message):
"""Helper method for assertions in tests"""
if condition:
results['passed'] += 1
else:
results['failed'] += 1
results['failures'].append(message)
return condition
class MockMessage:
"""A mock message class for testing"""
def __init__(self, author, content, channel):
self.author = author
self.content = content
self.channel = channel
self.id = int(time.time() * 1000)
self.created_at = datetime.now()
async def reply(self, content, **kwargs):
"""Mock reply method"""
return await self.channel.send(f"Reply to {self.id}: {content}", **kwargs)
async def edit(self, **kwargs):
"""Mock edit method"""
self.content = kwargs.get('content', self.content)
return self
async def cmd_test_debug(self, message):
"""Run a simple debug test to verify command functionality"""
try:
# Send a simple message that should always work
debug_msg = await message.channel.send("🔍 Debug test initiated...")
# Wait a moment
await asyncio.sleep(1)
# Edit the message
await debug_msg.edit(content="✅ Debug test successful - message editing works!")
except Exception as e:
# If this fails, there's a fundamental issue
await message.channel.send(f"❌ Debug test failed: {str(e)}")