430 lines
16 KiB
Python
430 lines
16 KiB
Python
import asyncio
|
|
import time
|
|
import inspect
|
|
import traceback
|
|
from datetime import datetime
|
|
|
|
class TestCommands:
|
|
def __init__(self, bot):
|
|
self.bot = bot
|
|
self.tests = {
|
|
"afk": self.test_afk,
|
|
"remindme": self.test_remindme,
|
|
"commands": self.test_commands,
|
|
"storage": self.test_storage,
|
|
"health": self.test_health,
|
|
"all": self.test_all,
|
|
}
|
|
|
|
async def cmd_test(self, message):
|
|
"""Run self-tests on the bot"""
|
|
parts = message.content.split(" ", 1)
|
|
test_name = parts[1] if len(parts) > 1 else "all"
|
|
|
|
if test_name not in self.tests:
|
|
await message.channel.send(
|
|
f"Unknown test '{test_name}'. Available tests: {', '.join(self.tests.keys())}"
|
|
)
|
|
return
|
|
|
|
# Create a status message without silent flag for better visibility
|
|
status_msg = await message.channel.send(f"🔄 Running test: {test_name}...")
|
|
|
|
start_time = time.time()
|
|
|
|
try:
|
|
if test_name == "all":
|
|
# For "all" tests, update status more frequently
|
|
await status_msg.edit(content="🔄 Initializing test suite...")
|
|
results = await self.test_all(status_msg)
|
|
else:
|
|
test_func = self.tests[test_name]
|
|
results = await test_func(status_msg)
|
|
|
|
elapsed = time.time() - start_time
|
|
|
|
# Format results into a nice report
|
|
report = self._format_test_report(results, elapsed)
|
|
|
|
# Make sure report isn't too long for Discord
|
|
if len(report) > 2000:
|
|
report = report[:1997] + "..."
|
|
|
|
# Update status with results
|
|
await status_msg.edit(content=report)
|
|
|
|
except Exception as e:
|
|
error_msg = f"❌ Test failed with error:\n\n{traceback.format_exc()[:1500]}\n"
|
|
print(f"Test error: {str(e)}")
|
|
await status_msg.edit(content=error_msg)
|
|
|
|
def _format_test_report(self, results, elapsed):
|
|
"""Format test results into a readable report"""
|
|
if isinstance(results, dict):
|
|
# We have multiple test suites
|
|
total_passed = sum(r['passed'] for r in results.values())
|
|
total_failed = sum(r['failed'] for r in results.values())
|
|
total_tests = total_passed + total_failed
|
|
|
|
report = f"# Self-Test Report ({elapsed:.2f}s)\n\n"
|
|
report += f"✅ **{total_passed}/{total_tests}** tests passed\n"
|
|
|
|
if total_failed > 0:
|
|
report += f"❌ **{total_failed}** tests failed\n\n"
|
|
else:
|
|
report += "\n"
|
|
|
|
# Add individual test suite results
|
|
for suite_name, suite_result in results.items():
|
|
passed = suite_result['passed']
|
|
failed = suite_result['failed']
|
|
total = passed + failed
|
|
status = "✅" if failed == 0 else "⚠️"
|
|
|
|
report += f"{status} **{suite_name}**: {passed}/{total} passed\n"
|
|
|
|
# Add failure details if any
|
|
if failed > 0 and 'failures' in suite_result:
|
|
report += "\n"
|
|
for failure in suite_result['failures']:
|
|
report += f" ❌ {failure}\n"
|
|
report += "\n"
|
|
else:
|
|
# Single test suite
|
|
passed = results['passed']
|
|
failed = results['failed']
|
|
total = passed + failed
|
|
|
|
report = f"# Test Results ({elapsed:.2f}s)\n\n"
|
|
report += f"✅ **{passed}/{total}** tests passed\n"
|
|
|
|
if failed > 0:
|
|
report += f"❌ **{failed}** tests failed\n\n"
|
|
report += "\n"
|
|
for failure in results.get('failures', []):
|
|
report += f" ❌ {failure}\n"
|
|
report += "\n"
|
|
|
|
return report
|
|
|
|
async def test_all(self, status_msg):
|
|
"""Run all available tests"""
|
|
results = {}
|
|
test_funcs = [v for k, v in self.tests.items() if k != "all"]
|
|
|
|
for i, test_func in enumerate(test_funcs):
|
|
test_name = test_func.__name__.replace('test_', '')
|
|
try:
|
|
# Update status before each test
|
|
await status_msg.edit(content=f"🔄 Running tests ({i+1}/{len(test_funcs)}): {test_name}...")
|
|
results[test_name] = await test_func(status_msg)
|
|
# Quick status after each test
|
|
passed = results[test_name]['passed']
|
|
failed = results[test_name]['failed']
|
|
await status_msg.edit(content=f"🔄 Test {test_name}: ✅{passed} ❌{failed} | Continuing tests...")
|
|
except Exception as e:
|
|
results[test_name] = {'passed': 0, 'failed': 1, 'failures': [f"Exception: {str(e)}"]}
|
|
await status_msg.edit(content=f"⚠️ Error in test {test_name}, continuing with next test...")
|
|
|
|
return results
|
|
|
|
async def test_afk(self, status_msg):
|
|
"""Test AFK functionality"""
|
|
results = {'passed': 0, 'failed': 0, 'failures': []}
|
|
|
|
# Save original state
|
|
original_afk = self.bot.AFK_STATUS
|
|
original_notified = self.bot.AFK_NOTIFIED_USERS.copy()
|
|
|
|
try:
|
|
# Test 1: Enable AFK
|
|
self.bot.AFK_STATUS = False
|
|
await self.bot.afk_commands.cmd_afk(status_msg)
|
|
|
|
if self.bot.AFK_STATUS:
|
|
results['passed'] += 1
|
|
else:
|
|
results['failed'] += 1
|
|
results['failures'].append("Failed to enable AFK mode")
|
|
|
|
# Test 2: Disable AFK
|
|
await self.bot.afk_commands.cmd_unafk(status_msg)
|
|
|
|
if not self.bot.AFK_STATUS:
|
|
results['passed'] += 1
|
|
else:
|
|
results['failed'] += 1
|
|
results['failures'].append("Failed to disable AFK mode")
|
|
|
|
# Test 3: AFK notification list clears
|
|
if len(self.bot.AFK_NOTIFIED_USERS) == 0:
|
|
results['passed'] += 1
|
|
else:
|
|
results['failed'] += 1
|
|
results['failures'].append("AFK notified users list not cleared")
|
|
|
|
finally:
|
|
# Restore original state
|
|
self.bot.AFK_STATUS = original_afk
|
|
self.bot.AFK_NOTIFIED_USERS = original_notified
|
|
|
|
return results
|
|
|
|
async def test_remindme(self, status_msg):
|
|
"""Test reminder functionality with a very short reminder"""
|
|
results = {'passed': 0, 'failed': 0, 'failures': []}
|
|
|
|
# Create a test reminder message
|
|
test_content = ".remindme 1s Test reminder"
|
|
mock_message = MockMessage(
|
|
author=self.bot.user,
|
|
content=test_content,
|
|
channel=status_msg.channel
|
|
)
|
|
|
|
# Set up a flag to verify the reminder was triggered
|
|
reminder_triggered = False
|
|
original_reply = mock_message.reply
|
|
|
|
async def mock_reply(content, **kwargs):
|
|
nonlocal reminder_triggered
|
|
if "Reminder:" in content:
|
|
reminder_triggered = True
|
|
return await original_reply(content, **kwargs)
|
|
|
|
mock_message.reply = mock_reply
|
|
|
|
try:
|
|
# Test reminder setup
|
|
await self.bot.utility_commands.cmd_remindme(mock_message)
|
|
|
|
# Wait for the reminder to trigger (slightly more than 1s)
|
|
await asyncio.sleep(1.5)
|
|
|
|
if reminder_triggered:
|
|
results['passed'] += 1
|
|
else:
|
|
results['failed'] += 1
|
|
results['failures'].append("Reminder did not trigger")
|
|
|
|
except Exception as e:
|
|
results['failed'] += 1
|
|
results['failures'].append(f"Reminder test error: {str(e)}")
|
|
|
|
return results
|
|
|
|
async def test_commands(self, status_msg):
|
|
"""Test custom command functionality"""
|
|
results = {'passed': 0, 'failed': 0, 'failures': []}
|
|
|
|
# Test command name
|
|
test_cmd_name = "__test_cmd__"
|
|
test_cmd_path = f"commands/{test_cmd_name}.py"
|
|
|
|
try:
|
|
# Test 1: Add a command
|
|
add_cmd_msg = MockMessage(
|
|
author=self.bot.user,
|
|
content=f".addcmd {test_cmd_name} return await msg.reply('test success', silent=True)",
|
|
channel=status_msg.channel
|
|
)
|
|
|
|
await self.bot.admin_commands.cmd_addcmd(add_cmd_msg)
|
|
|
|
if test_cmd_name in self.bot.loaded_commands:
|
|
results['passed'] += 1
|
|
else:
|
|
results['failed'] += 1
|
|
results['failures'].append("Failed to add test command")
|
|
|
|
# Test 2: Execute the command
|
|
if test_cmd_name in self.bot.loaded_commands:
|
|
try:
|
|
test_msg = MockMessage(
|
|
author=self.bot.user,
|
|
content="test content",
|
|
channel=status_msg.channel
|
|
)
|
|
|
|
reply_received = False
|
|
original_reply = test_msg.reply
|
|
|
|
async def test_reply(content, **kwargs):
|
|
nonlocal reply_received
|
|
if content == "test success":
|
|
reply_received = True
|
|
return await original_reply(content, **kwargs)
|
|
|
|
test_msg.reply = test_reply
|
|
|
|
await self.bot.loaded_commands[test_cmd_name](test_msg)
|
|
|
|
if reply_received:
|
|
results['passed'] += 1
|
|
else:
|
|
results['failed'] += 1
|
|
results['failures'].append("Command execution failed")
|
|
|
|
except Exception as e:
|
|
results['failed'] += 1
|
|
results['failures'].append(f"Command execution error: {str(e)}")
|
|
|
|
# Test 3: Delete the command
|
|
del_cmd_msg = MockMessage(
|
|
author=self.bot.user,
|
|
content=f".delcmd {test_cmd_name}",
|
|
channel=status_msg.channel
|
|
)
|
|
|
|
await self.bot.admin_commands.cmd_delcmd(del_cmd_msg)
|
|
|
|
if test_cmd_name not in self.bot.loaded_commands:
|
|
results['passed'] += 1
|
|
else:
|
|
results['failed'] += 1
|
|
results['failures'].append("Failed to delete test command")
|
|
|
|
except Exception as e:
|
|
results['failed'] += 1
|
|
results['failures'].append(f"Command test error: {str(e)}")
|
|
|
|
# Clean up any leftovers
|
|
import os
|
|
if os.path.exists(test_cmd_path):
|
|
try:
|
|
os.remove(test_cmd_path)
|
|
self.bot.reload_commands()
|
|
except:
|
|
pass
|
|
|
|
return results
|
|
|
|
async def test_storage(self, status_msg):
|
|
"""Test channel tracking functionality"""
|
|
results = {'passed': 0, 'failed': 0, 'failures': []}
|
|
|
|
# Save original state
|
|
original_tracked = self.bot.tracked_channels.copy()
|
|
|
|
try:
|
|
# Test channel ID that likely doesn't exist
|
|
test_channel_id = 1349095349905788968
|
|
|
|
# Make sure it's not in the tracked channels
|
|
if test_channel_id in self.bot.tracked_channels:
|
|
self.bot.tracked_channels.remove(test_channel_id)
|
|
|
|
# Test 1: Add a tracked channel
|
|
self.bot.tracked_channels.append(test_channel_id)
|
|
from utils.storage import save_tracked_channels
|
|
save_tracked_channels(self.bot.tracked_channels)
|
|
|
|
# Test 2: Load tracked channels
|
|
from utils.storage import load_tracked_channels
|
|
loaded_channels = load_tracked_channels()
|
|
|
|
if test_channel_id in loaded_channels:
|
|
results['passed'] += 1
|
|
else:
|
|
results['failed'] += 1
|
|
results['failures'].append("Failed to save/load tracked channel")
|
|
|
|
# Test 3: Remove tracked channel
|
|
if test_channel_id in self.bot.tracked_channels:
|
|
self.bot.tracked_channels.remove(test_channel_id)
|
|
save_tracked_channels(self.bot.tracked_channels)
|
|
|
|
loaded_channels = load_tracked_channels()
|
|
if test_channel_id not in loaded_channels:
|
|
results['passed'] += 1
|
|
else:
|
|
results['failed'] += 1
|
|
results['failures'].append("Failed to remove tracked channel")
|
|
|
|
finally:
|
|
# Restore original state
|
|
self.bot.tracked_channels = original_tracked
|
|
from utils.storage import save_tracked_channels
|
|
save_tracked_channels(self.bot.tracked_channels)
|
|
|
|
return results
|
|
|
|
async def test_health(self, status_msg):
|
|
"""Test basic bot health and connectivity"""
|
|
results = {'passed': 0, 'failed': 0, 'failures': []}
|
|
|
|
# Test 1: Check if the bot is logged in
|
|
if self.bot.user is not None:
|
|
results['passed'] += 1
|
|
else:
|
|
results['failed'] += 1
|
|
results['failures'].append("Bot is not logged in")
|
|
|
|
# Test 2: Check discord API connection by getting client latency
|
|
try:
|
|
latency = self.bot.latency
|
|
if isinstance(latency, float):
|
|
results['passed'] += 1
|
|
else:
|
|
results['failed'] += 1
|
|
results['failures'].append(f"Invalid latency value: {latency}")
|
|
except Exception as e:
|
|
results['failed'] += 1
|
|
results['failures'].append(f"Failed to get latency: {str(e)}")
|
|
|
|
# Test 3: Test message sending/editing (core functionality)
|
|
try:
|
|
test_msg = await status_msg.channel.send(
|
|
"Test message - will be deleted", silent=False
|
|
)
|
|
await test_msg.edit(content="Test message edited - will be deleted")
|
|
await test_msg.delete()
|
|
results['passed'] += 1
|
|
except Exception as e:
|
|
results['failed'] += 1
|
|
results['failures'].append(f"Message operations failed: {str(e)}")
|
|
|
|
return results
|
|
|
|
def _assert(self, results, condition, message):
|
|
"""Helper method for assertions in tests"""
|
|
if condition:
|
|
results['passed'] += 1
|
|
else:
|
|
results['failed'] += 1
|
|
results['failures'].append(message)
|
|
return condition
|
|
|
|
class MockMessage:
|
|
"""A mock message class for testing"""
|
|
def __init__(self, author, content, channel):
|
|
self.author = author
|
|
self.content = content
|
|
self.channel = channel
|
|
self.id = int(time.time() * 1000)
|
|
self.created_at = datetime.now()
|
|
|
|
async def reply(self, content, **kwargs):
|
|
"""Mock reply method"""
|
|
return await self.channel.send(f"Reply to {self.id}: {content}", **kwargs)
|
|
|
|
async def edit(self, **kwargs):
|
|
"""Mock edit method"""
|
|
self.content = kwargs.get('content', self.content)
|
|
return self
|
|
|
|
async def cmd_test_debug(self, message):
|
|
"""Run a simple debug test to verify command functionality"""
|
|
try:
|
|
# Send a simple message that should always work
|
|
debug_msg = await message.channel.send("🔍 Debug test initiated...")
|
|
|
|
# Wait a moment
|
|
await asyncio.sleep(1)
|
|
|
|
# Edit the message
|
|
await debug_msg.edit(content="✅ Debug test successful - message editing works!")
|
|
|
|
except Exception as e:
|
|
# If this fails, there's a fundamental issue
|
|
await message.channel.send(f"❌ Debug test failed: {str(e)}") |