Source code for bobbot.discord_helpers.activity_manager

"""Manages the bot's activity status."""

import asyncio
import os
import uuid
from pathlib import Path
from typing import Callable, Optional

import discord
from discord import app_commands
from discord.ext import commands

from bobbot.activities import (
    Activity,
    configure_chess,
    configure_hangman,
    configure_school,
    get_activity,
    get_activity_status,
    spectate_activity,
    start_activity,
    stop_activity,
)
from bobbot.agents import extract_answers, get_response
from bobbot.discord_helpers.main_bot import bot, lazy_send_message
from bobbot.discord_helpers.text_channel_history import (
    TextChannelHistory,
    get_channel_history,
)
from bobbot.memory import is_sparse_encoder_loaded
from bobbot.utils import get_logger, is_playwright_browser_open, on_heroku

logger = get_logger(__name__)
waiting_cmd_events: dict[str, asyncio.Event] = {}
waiting_responses: dict[str, str] = {}
spectate_status: str = "idle"


[docs] async def command_handler( channel: discord.TextChannel, command: str, expect_response: bool = False, output_directly: bool = False, hide_output: bool = False, use_history: bool = True, ) -> Optional[str]: """Handle a command from the current activity. Commands are directions to Bob, with info or requests to be relayed to the user. If expect_response is True, the user's response will be waited for and returned. Otherwise, the response given to the user is returned. Args: channel: The channel the command is associated with. command: The command to give to Bob. expect_response: Whether to wait for the user's response. output_directly: Whether to send the literal command directly to the user. hide_output: Whether to avoid sending the message to the user (just return it instead). use_history: Whether to include any channel history. """ # Command-specific handlers if "start_spectating" in command: await spectate(channel) return if output_directly: logger.info(f"Direct response: {command}") if not hide_output: await lazy_send_message(channel, command, force=True) if not expect_response: return command else: # Update history history: TextChannelHistory = get_channel_history(channel) await history.aupdate() # In-character response if use_history: response: str = await get_response( history.as_langchain_msgs(bot.user), context=command, store_memories=False ) else: response: str = await get_response([], context=command, store_memories=False) logger.info(f"Command: {command} -> Response: {response}") if not hide_output: await lazy_send_message(channel, response, force=True) if not expect_response: return response if expect_response: # Wait for the user's response id = str(uuid.uuid4()) event: asyncio.Event = asyncio.Event() waiting_cmd_events[id] = event waiting_responses[id] = command logger.info(f"Waiting for response to '{command}'...") await event.wait() waiting_cmd_events.pop(id, None) return waiting_responses.pop(id, None)
[docs] def gen_command_handler(channel: discord.TextChannel) -> Callable: """Generate a command handler for the given channel.""" async def _channel_command_handler(command: str, **kwargs) -> str: """Handle a command from the current activity.""" return await command_handler(channel, command, **kwargs) return _channel_command_handler
[docs] async def check_waiting_responses(channel: discord.TextChannel) -> None: """Check if any waiting responses were answered.""" if not waiting_cmd_events: return # No waiting responses history: TextChannelHistory = get_channel_history(channel) assert len(waiting_cmd_events) == len(waiting_responses) # Check for race conditions waiting_ids = list(waiting_cmd_events.keys()) answers = await extract_answers(history.as_string(10), list(waiting_responses.values())) for question_num, [is_answer, text] in answers.items(): curr_id = waiting_ids[question_num - 1] if curr_id in waiting_cmd_events: # Still waiting (not answered by another call) question = waiting_responses[curr_id] if is_answer: logger.info(f"Answer to '{question}': {text}") waiting_responses[curr_id] = text waiting_cmd_events[curr_id].set() else: logger.info(f"Clarification for '{question}': {text}") await command_handler(channel, text) # Send clarification
# ===== Chess and League Commands ===== @bot.hybrid_command(name="chess") @app_commands.choices( against=[ app_commands.Choice(name="Human", value="human"), app_commands.Choice(name="Chess.com Bot", value="bot"), ] ) async def chess(ctx: commands.Context, elo: int, against: Optional[str]) -> None: """Start a chess game with Bob. Args: ctx: The context of the command. elo: The elo rating to play at. Must be in 200-1600. against: Whether to play against a human or bot. Defaults to a human. """ if elo < 200 or elo > 1600: await ctx.send("! invalid elo, must be between 200 and 1600") return against_computer: bool = against is not None and against.lower() == "bot" configure_chess(elo, against_computer) await ctx.send( f"ok, ill play chess at {elo} elo vs {'a bot' if against_computer else f'u <@{ctx.author.id}>'}, lets go!" ) await start_activity(Activity.CHESS, gen_command_handler(ctx.channel)) # ===== Hangman Commands ===== @bot.hybrid_command(name="hangman") async def hangman(ctx: commands.Context, *, theme: str) -> None: """Start a hangman game with Bob. Args: ctx: The context of the command. theme: The theme to play hangman with. """ if os.getenv("WORK_BLOCK") and ctx.author.id % 1000000007 == 380204424: await ctx.send(f"nah do ur work <@{ctx.author.id}>... u got this!") return configure_hangman(theme, new_only_hint=None) await ctx.send(f"! reset... ok, ill play hangman with u <@{ctx.author.id}>, lets go!") await start_activity(Activity.HANGMAN, gen_command_handler(ctx.channel)) @bot.hybrid_command(name="timedhangman") async def timed_hangman(ctx: commands.Context, *, theme: str) -> None: """Start a timed hangman game with Bob. Args: ctx: The context of the command. theme: The theme to play hangman with. """ if os.getenv("WORK_BLOCK") and ctx.author.id % 1000000007 == 380204424: await ctx.send(f"nah do ur work <@{ctx.author.id}>... u got this!") return configure_hangman(theme, new_only_hint=True, timed=True) await ctx.send(f"! reset... ok, ill play timed hangman with u <@{ctx.author.id}>, lets go!") await start_activity(Activity.HANGMAN, gen_command_handler(ctx.channel)) @bot.hybrid_command(name="customhangman") async def custom_hangman( ctx: commands.Context, theme: str, hint_prompt: Optional[str] = None, only_hint: bool = True, timed: bool = True, helpfulness_mult: float = 1.0, time_per_answer: int = 30, ) -> None: """Start a custom hangman game with Bob. Args: ctx: The context of the command. theme: The theme to play hangman with. hint_prompt: The type of hint to give. only_hint: Whether to only show the hint (no blanks or letters). timed: Whether the game should be timed. helpfulness_mult: Multiplier adjustment for hint helpfulness. time_per_answer: The amount of time given for each round. """ if os.getenv("WORK_BLOCK") and ctx.author.id % 1000000007 == 380204424: await ctx.send(f"nah do ur work <@{ctx.author.id}>... u got this!") return configure_hangman( theme, timed=timed, new_hint_prompt=hint_prompt, new_only_hint=only_hint, helpfulness_mult=helpfulness_mult, time_per_answer=time_per_answer, ) await ctx.send(f"! reset... ok, ill play custom hangman with u <@{ctx.author.id}>, lets go :)") await start_activity(Activity.HANGMAN, gen_command_handler(ctx.channel)) # ===== Activity Group Commands ===== @bot.hybrid_command(name="accountability") async def accountability(ctx: commands.Context, task: str, duration: float) -> None: """Start an accountability task with Bob. Args: ctx: The context of the command. task: The task to do. duration: The duration of the task in minutes. """ configure_school(duration, task) await ctx.send(f"ok, go do '{task}' for the next {round(duration)} minutes <@{ctx.author.id}>. ill check on u :)") await start_activity(Activity.SCHOOL, gen_command_handler(ctx.channel)) @bot.hybrid_group(name="activity", fallback="status") async def activity(ctx: commands.Context) -> None: """Check Bob's current activity status.""" status = await get_activity_status() await ctx.send( f"! {status}\nplaywright: {is_playwright_browser_open()}, sparse encoder: {is_sparse_encoder_loaded()}" ) @activity.command(name="start") @app_commands.choices( activity=[ app_commands.Choice(name="School", value="school"), app_commands.Choice(name="Eat", value="eat"), app_commands.Choice(name="Shower", value="shower"), app_commands.Choice(name="Sleep", value="sleep"), app_commands.Choice(name="Chess", value="chess"), # app_commands.Choice(name="League", value="league"), app_commands.Choice(name="Hangman", value="hangman"), ] ) async def do_basic_activity(ctx: commands.Context, activity: str) -> None: """Start an activity with default parameters.""" try: if activity == "chess": await chess(ctx, 800, "human") return elif activity == "hangman": if os.getenv("WORK_BLOCK") and ctx.author.id % 1000000007 == 380204424: await ctx.send(f"nah do ur work <@{ctx.author.id}>... u got this!") return act = Activity(activity) await ctx.send(f"ok i {activity} now") await start_activity(act, gen_command_handler(ctx.channel)) except ValueError: await ctx.send("! invalid activity, try school, eat, shower, sleep, chess, league, or hangman") @activity.command(name="stop") async def discord_stop_activity(ctx: commands.Context) -> None: """Stops the current activity.""" if get_activity() is None: await ctx.send("! not in an activity") else: await stop_activity() await ctx.send("! stopped activity") # ===== Spectate Commands ===== @bot.hybrid_group(name="spectate", fallback="start") async def spectate(ctx: commands.Context, video: bool = True) -> None: """Start spectating the current activity. If on Heroku, video mode is disabled. Either uses a low quality/frame rate video or a screenshot. If given messages, sends them instead. Args: ctx: The context of the command. video: Whether to spectate in video mode. Has no effect on Heroku. """ if on_heroku(): video = False # Not enough memory to do video spectating on Heroku RATE = 1.5 # Need to slow down editing rate for video mode global spectate_status if spectate_status in ["stopping"]: await ctx.send("! too fast, try again in a bit") return elif spectate_status == "spectating": spectate_status = "stopping" # Wait for previous spectate to stop while spectate_status == "stopping": await asyncio.sleep(1) spectate_status = "spectating" curr_message: Optional[discord.Message] = None image_or_msg: Optional[list[str] | Path] = await spectate_activity() # Image or list of messages if image_or_msg is None: await ctx.send("! no activity D:") spectate_status = "idle" return try: frame_num = 1 while spectate_status == "spectating": if isinstance(image_or_msg, Path): content = f"Spectating: (Frame {frame_num})" if video else None if curr_message is not None: # Edit previous message await curr_message.edit( content=content, attachments=[discord.File(fp=image_or_msg, filename="spectate.jpeg")] ) else: curr_message = await ctx.send( content=content, file=discord.File(fp=image_or_msg, filename="spectate.jpeg") ) frame_num += 1 await asyncio.sleep(RATE) # Slow down editing rate elif isinstance(image_or_msg, list): await ctx.send(image_or_msg[0]) for msg in image_or_msg[1:]: await asyncio.sleep(1) await lazy_send_message(ctx.channel, msg, instant=True, force=True) break else: if curr_message is not None: await curr_message.edit(content="Done spectating.") else: await ctx.send("Done spectating.") break if not video: break image_or_msg = await spectate_activity() except Exception: logger.exception("Error during spectating") await ctx.send("! error during spectating") if spectate_status == "stopping": spectate_status = "spectating" # Let the next spectate start else: spectate_status = "idle" @spectate.command(name="stop") async def discord_stop_spectating(ctx: commands.Context) -> None: """Stop spectating the current activity.""" global spectate_status if spectate_status == "spectating": spectate_status = "stopping" await ctx.send("! ok D:") else: await ctx.send("! but ur not spectating anything D:")