Source code for bobbot.discord_helpers.message_manager

"""Manages messages received by the bot."""

import asyncio
from datetime import datetime, timedelta, timezone

import discord
from discord.ext import commands
from langchain.docstore.document import Document

from bobbot.activities import (
    Activity,
    get_activity,
    get_activity_status,
    hangman_on_message,
)
from bobbot.agents import (  # decide_to_respond,
    check_openai_safety,
    get_response,
    get_response_with_tools,
)
from bobbot.discord_helpers.activity_manager import check_waiting_responses
from bobbot.discord_helpers.main_bot import bot, lazy_send_message
from bobbot.discord_helpers.text_channel_history import (
    TextChannelHistory,
    get_channel_history,
)
from bobbot.memory import is_sparse_encoder_loaded, query_memories
from bobbot.utils import (
    get_logger,
    is_playwright_browser_open,
    log_debug_info,
    on_heroku,
    reset_debug_info,
    time_elapsed_str,
    truncate_length,
)

logger = get_logger(__name__)


[docs] @bot.event async def on_message(message: discord.Message, use_perplexity: bool = False, use_reasoning: bool = False): """Respond to messages, using or not using smart search (Perplexity) and reasoning.""" # Only respond to messages in DMs and specified channels if not (use_perplexity or message.channel.id in bot.CHANNELS or isinstance(message.channel, discord.DMChannel)): return curr_channel: discord.TextChannel = message.channel # Set the active channel bot.active_channel = message.channel await bot.process_commands(message) # Don't respond if the bot is off, or if it's a command message if message.content.startswith(bot.command_prefix): return elif not bot.is_on: if bot.user in message.mentions: await lazy_send_message( curr_channel, "! im off rn, see help with /help, turn on with /config on", instant=True, force=True ) return # For now, don't respond to self messages if not use_perplexity and not use_reasoning and message.author == bot.user: return # Call play hangman if activity is hangman and not pinged if get_activity() == Activity.HANGMAN and bot.user not in message.mentions: response = await hangman_on_message(message.content) if response: await lazy_send_message(message.channel, response) return # Don't respond further unless pinged or in a DM if not ( use_perplexity or use_reasoning or bot.user in message.mentions or isinstance(message.channel, discord.DMChannel) ): return # Check for research or reasoning queries if "-research" in message.content.strip().split(): use_perplexity = True if "-reason" in message.content.strip().split(): use_reasoning = True if use_perplexity and use_reasoning: use_reasoning = False # Perplexity includes reasoning # Get history for the current channel history: TextChannelHistory = get_channel_history(curr_channel) await history.aupdate() history.clear_users_typing() saved_message_count: int = history.message_count # Get a response if on_heroku(): logger.info( f"Before message, playwright: {is_playwright_browser_open()}, sparse encoder: {is_sparse_encoder_loaded()}" ) try: reset_debug_info() short_history: str = history.as_string(5) # decision, thoughts = await decide_to_respond(short_history) # Bypass decision agent (due to pings) # if decision is False: # return async with curr_channel.typing(): asyncio.create_task(check_waiting_responses(curr_channel)) is_safe = await check_openai_safety(short_history) heroku_override = False if on_heroku() and get_activity() == Activity.CHESS: heroku_override = True # Can't query memories while playing chess on Heroku elif on_heroku() and bot.voice_clients: heroku_override = True # Can't query memories while in a voice channel on Heroku if on_heroku(): logger.info(f"Heroku memory saver override: {heroku_override}") # Don't use memories if unsafe or using Perplexity/reasoning if not bot.is_incognito and not heroku_override and is_safe and not use_perplexity and not use_reasoning: # Find relevant memories using varying methods EACH_LIMIT = 2 MAX_MEMORIES = 4 # Run all memory queries in parallel tiny_history: str = history.as_string(3) only_message: str = history.as_string(1, with_author=False, with_context=False, with_reactions=False) ( memories_0, memories_1, memories_2, memories_3, memories_4, memories_5, memories_6, ) = await asyncio.gather( query_memories(short_history, limit=EACH_LIMIT), # More context query_memories(tiny_history, limit=EACH_LIMIT), # Some context query_memories( tiny_history, limit=EACH_LIMIT, age_limit=timedelta(hours=1) ), # Some context, recent only query_memories(only_message, limit=EACH_LIMIT), # Last message with no author query_memories( only_message, limit=EACH_LIMIT, age_limit=timedelta(hours=1) ), # Last message with no author, recent only query_memories( tiny_history, limit=EACH_LIMIT, ignore_recent=False, only_tools=True ), # Relevant tool calls, some context query_memories( tiny_history, limit=EACH_LIMIT, ignore_recent=False, only_tools=True, age_limit=timedelta(hours=1), ), # Relevant tool calls, some context, recent only ) # Fetch up to MAX_MEMORIES memories using a rough heuristic order, removing duplicates memory_lists = [memories_6, memories_1, memories_4, memories_3, memories_5, memories_0, memories_2] memories: list[Document] = [] for i in range(EACH_LIMIT): for j, memory_list in enumerate(memory_lists): if i < len(memory_list) and len(memories) < MAX_MEMORIES: memory = memory_list[i] if memory.metadata["id"] not in [m.metadata["id"] for m in memories]: memories.append(memory) if len(memories) == MAX_MEMORIES: logger.info(f"Hit max of {MAX_MEMORIES} memories on iter {i}, memory list {j}.") # Format memories as strings formatted_memories: list[str] = [] for memory in memories: timestamp = memory.metadata["creation_time"] time_str = time_elapsed_str(datetime.fromtimestamp(timestamp, tz=timezone.utc)) mem_title = f"({time_str})" mem_content = truncate_length(memory.page_content, 2048) formatted_memories.append(f"{mem_title}\n{mem_content}") if formatted_memories: log_debug_info( truncate_length( f"===== Bob memories (found {len(memories)}) =====\n{'\n'.join([truncate_length(m, 192) for m in formatted_memories])}", # noqa: E501 1000, ) ) else: formatted_memories = [] # Get activity status activity_status = await get_activity_status() has_activity_status = "You're free right now." not in activity_status if has_activity_status: log_debug_info(f"===== Bob activity status =====\n{activity_status}") # Provide context context = "Here is some context that may be helpful." has_context = False if formatted_memories: has_context = True context += "\n\nMemories from past conversations and tool calls (be wary of outdated info):\n" context += "\n\n".join(formatted_memories) if has_activity_status: has_context = True context += f"\n\nYour status: {activity_status}" context = context.strip() if has_context else None # logger.info(f"Context:\n{context}") # Get response and send message if not use_perplexity and not use_reasoning: response: str = await get_response_with_tools( history.as_langchain_msgs(bot.user), context=context, uncensored=not is_safe, obedient=bot.is_obedient or not is_safe, store_memories=not bot.is_incognito and not heroku_override, ) else: response: str = await get_response( history.as_langchain_msgs(bot.user), context=context, obedient=bot.is_obedient or not is_safe, store_memories=not bot.is_incognito and not heroku_override, use_perplexity=use_perplexity, use_reasoning=use_reasoning, ) if history.message_count == saved_message_count: await lazy_send_message(message.channel, response) except Exception as e: logger.exception("Error getting response") await lazy_send_message(message.channel, str(e), instant=True, force=True)
@bot.hybrid_command(name="research") async def research(ctx: commands.Context, *, query: str) -> None: """Return a response using research from online.""" # if ctx.interaction is not None: # message = await ctx.send(f'researching query: "{query}"') # else: # return message = await ctx.send(f'researching query: "{query}"') await on_message(message, use_perplexity=True) @bot.hybrid_command(name="reason") async def reason(ctx: commands.Context, *, query: str) -> None: """Return a response using reasoning.""" # if ctx.interaction is not None: # message = await ctx.send(f'reasoning through query: "{query}"') # else: # return message = await ctx.send(f'researching query: "{query}"') await on_message(message, use_reasoning=True)