139 lines
4.5 KiB
Python
139 lines
4.5 KiB
Python
import asyncio
|
|
import logging
|
|
import random
|
|
|
|
import discord
|
|
|
|
from botbotbot.ai import AIBot
|
|
from botbotbot.shuffle import Shuffler
|
|
from botbotbot.tts import CambAI
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class VoiceBot:
|
|
def __init__(
|
|
self,
|
|
bot: discord.Bot,
|
|
cambai: CambAI | None = None,
|
|
aibot: AIBot | None = None,
|
|
shuffler: Shuffler | None = None,
|
|
guild_ids: list[int] = [],
|
|
) -> None:
|
|
self.bot = bot
|
|
self.cambai = cambai
|
|
self.shf = shuffler
|
|
self.aibot = aibot
|
|
self.guild_ids = guild_ids
|
|
|
|
def init_events(self) -> None:
|
|
self.bot.add_listener(self.on_voice_state_update, "on_voice_state_update")
|
|
self.bot.add_listener(self.random_connect, "on_ready")
|
|
self.bot.add_application_command(
|
|
discord.SlashCommand(
|
|
self.join_voice,
|
|
name="join",
|
|
description="Rejoindre un channel vocal",
|
|
guild_ids=self.guild_ids,
|
|
)
|
|
)
|
|
|
|
async def random_connect(self) -> None:
|
|
while True:
|
|
logger.info("Random connect.")
|
|
for g_id in self.guild_ids:
|
|
guild = self.bot.get_guild(g_id)
|
|
if guild is None or random.random() > 10 / 100:
|
|
continue
|
|
|
|
voice_chan = random.choice(guild.voice_channels)
|
|
await self.connect_voice(voice_chan)
|
|
await asyncio.sleep(60)
|
|
|
|
async def connect_voice(
|
|
self, channel: discord.VoiceChannel | discord.StageChannel
|
|
) -> None:
|
|
if self.cambai is None:
|
|
return
|
|
|
|
logger.info(f"Connecting to voice channel <{channel}>.")
|
|
if len(channel.members) == 0:
|
|
vo = await self.connect_voice_chan(channel)
|
|
await asyncio.sleep(10)
|
|
await vo.disconnect()
|
|
return
|
|
elif len(channel.members) == 1:
|
|
member = channel.members[0].display_name
|
|
else:
|
|
member = (
|
|
", ".join(member.display_name for member in channel.members[:-1])
|
|
+ " et "
|
|
+ channel.members[-1].display_name
|
|
)
|
|
|
|
script = None
|
|
if self.aibot is not None and random.random() < 20 / 100:
|
|
script = self.aibot.answer("Dis un truc aléatoire et original.")
|
|
if script is None:
|
|
script = random.choice(
|
|
[
|
|
"Salut la jeunesse !",
|
|
f"Salut {member}, ça va bien ?",
|
|
"Allo ? À l'huile !",
|
|
]
|
|
)
|
|
source = await discord.FFmpegOpusAudio.from_probe(self.cambai.tts(script))
|
|
vo = await self.connect_voice_chan(channel)
|
|
|
|
await vo.play(source, wait_finish=True)
|
|
await vo.disconnect()
|
|
|
|
async def connect_voice_chan(
|
|
self, channel: discord.VoiceChannel | discord.StageChannel
|
|
) -> discord.VoiceClient:
|
|
for vc in self.bot.voice_clients:
|
|
if isinstance(vc, discord.VoiceClient) and vc.guild == channel.guild:
|
|
await vc.disconnect()
|
|
return await channel.connect()
|
|
|
|
async def join_voice(self, ctx: discord.ApplicationContext) -> None:
|
|
if ctx.user.voice:
|
|
await ctx.respond(
|
|
f"Connecting to voice channel {ctx.user.voice.channel}.",
|
|
ephemeral=True,
|
|
delete_after=30,
|
|
)
|
|
await self.connect_voice(ctx.user.voice.channel)
|
|
else:
|
|
await ctx.respond(
|
|
"You are not connected to a voice channel.",
|
|
ephemeral=True,
|
|
delete_after=30,
|
|
)
|
|
|
|
async def on_voice_state_update(
|
|
self,
|
|
member: discord.Member,
|
|
before: discord.VoiceState,
|
|
after: discord.VoiceState,
|
|
) -> None:
|
|
logger.debug("Voice state update")
|
|
logger.debug(before.channel)
|
|
logger.debug(after.channel)
|
|
if after.channel:
|
|
logger.debug(after.channel.members)
|
|
|
|
if (
|
|
self.cambai is not None
|
|
and before.channel is None
|
|
and after.channel is not None
|
|
and self.bot not in after.channel.members
|
|
and self.bot.user
|
|
and member.id != self.bot.user.id
|
|
and random.random() < 5 / 100
|
|
):
|
|
await self.connect_voice(after.channel)
|
|
|
|
if self.shf and before.channel is None and random.random() < 5 / 100:
|
|
logger.info(f"Voice shuffle from {member}")
|
|
await self.shf.try_shuffle(member.guild)
|