import asyncio import json import logging import os from datetime import datetime, timedelta from pathlib import Path from dotenv import load_dotenv load_dotenv() import discord import edge_tts from discord import app_commands from discord.ext import commands, tasks logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) with open("config.json", "r", encoding="utf-8") as f: config = json.load(f) with open("voices.json", "r", encoding="utf-8") as f: voices_data = json.load(f) Path("audio_cache").mkdir(exist_ok=True) SETTINGS_FILE = "guild_settings.json" def load_guild_settings() -> dict: if os.path.exists(SETTINGS_FILE): with open(SETTINGS_FILE, "r", encoding="utf-8") as f: return json.load(f) return {} def save_guild_settings(): saveable = {} for gid, data in guild_settings.items(): saveable[gid] = { "tts_channels": data.get("tts_channels", []), "bound_vc": data.get("bound_vc", None), } with open(SETTINGS_FILE, "w", encoding="utf-8") as f: json.dump(saveable, f, indent=2) guild_settings: dict = load_guild_settings() voice_clients: dict[int, discord.VoiceClient] = {} last_activity: dict[int, datetime] = {} user_voices: dict[int, dict[int, str]] = {} tts_queues: dict[int, asyncio.Queue] = {} tts_enabled: dict[int, set[int]] = {} last_speaker: dict[int, int] = {} user_display_names: dict[int, dict[int, str]] = {} intents = discord.Intents.default() intents.message_content = True intents.voice_states = True bot = commands.Bot(command_prefix="\x00", intents=intents) def get_guild(guild_id: int) -> dict: key = str(guild_id) if key not in guild_settings: guild_settings[key] = {"tts_channels": [], "bound_vc": None} return guild_settings[key] def get_display_name(guild_id: int, member: discord.Member) -> str: return user_display_names.get(guild_id, {}).get(member.id, member.display_name) def get_user_voice(guild_id: int, user_id: int) -> str: return user_voices.get(guild_id, {}).get(user_id, config["tts"]["default_voice"]) def set_user_voice(guild_id: int, user_id: int, voice_id: str): user_voices.setdefault(guild_id, {})[user_id] = voice_id def find_voice(name: str): for lang, voices in voices_data["voices"].items(): for v in voices: if v["name"].lower() == name.lower() or v["id"].lower() == name.lower(): return v, lang return None, None def ffmpeg_options() -> dict: return { "options": ( f"-vn -b:a {config['audio']['bitrate']}k " f"-ar {config['audio']['sample_rate']} -ac 2 " f'-af "loudnorm=I=-16:TP=-1.5:LRA=11,' f'acompressor=threshold=-20dB:ratio=4:attack=5:release=50"' ), } async def generate_tts(text: str, voice_id: str, path: str) -> bool: try: comm = edge_tts.Communicate( text, voice_id, rate=config["tts"]["rate"], pitch=config["tts"]["pitch"] ) await comm.save(path) return True except Exception as e: logger.error(f"TTS error: {e}") return False async def cleanup(path: str, delay: int = 8): await asyncio.sleep(delay) try: if os.path.exists(path): os.remove(path) except Exception: pass async def ensure_connected( guild: discord.Guild, member: discord.Member ) -> discord.VoiceClient | None: if not member.voice or not member.voice.channel: return None target = member.voice.channel vc = voice_clients.get(guild.id) if vc and vc.is_connected(): if vc.channel.id != target.id: await vc.move_to(target) return vc try: vc = await target.connect() voice_clients[guild.id] = vc return vc except Exception as e: logger.error(f"VC connect error: {e}") return None async def enqueue( guild: discord.Guild, member: discord.Member, text: str, voice_id: str ): q = tts_queues.setdefault(guild.id, asyncio.Queue()) await q.put((text, voice_id, member)) if q.qsize() == 1: asyncio.create_task(queue_worker(guild)) async def queue_worker(guild: discord.Guild): q = tts_queues.get(guild.id) if not q: return while not q.empty(): text, voice_id, member = await q.get() vc = await ensure_connected(guild, member) if vc is None: await member.send( "Could not join a voice channel. Make sure you are in a voice channel (or the bound VC if one is set)." ) q.task_done() continue last_activity[guild.id] = datetime.now() ts = datetime.now().strftime("%Y%m%d_%H%M%S_%f") path = f"audio_cache/{guild.id}_{ts}.mp3" ok = await generate_tts(text, voice_id, path) if not ok: await member.send( "Failed to generate TTS audio. The TTS service may be unavailable." ) q.task_done() continue while vc.is_playing(): await asyncio.sleep(0.1) try: source = discord.FFmpegPCMAudio(path, **ffmpeg_options()) vc.play(source) asyncio.create_task(cleanup(path, delay=12)) except Exception as e: logger.error(f"Playback error: {e}") await member.send("Failed to play audio. Check that ffmpeg is installed.") q.task_done() @bot.event async def on_ready(): logger.info(f"Logged in as {bot.user}") check_inactivity.start() try: synced = await bot.tree.sync() logger.info(f"Synced {len(synced)} commands") except Exception as e: logger.error(f"Sync error: {e}") @bot.event async def on_message(message: discord.Message): if message.author.bot or not message.guild: return gs = get_guild(message.guild.id) if message.channel.id not in gs.get("tts_channels", []): return text = message.content.strip() if not text or text.startswith("/"): return enabled = tts_enabled.get(message.guild.id, set()) if message.author.id not in enabled: return if not message.author.voice or not message.author.voice.channel: await message.reply("You need to be in a voice channel.", mention_author=False) return if len(text) > config["tts"]["max_length"]: text = text[: config["tts"]["max_length"]] voice_id = get_user_voice(message.guild.id, message.author.id) prev = last_speaker.get(message.guild.id) last_speaker[message.guild.id] = message.author.id if prev != message.author.id: name = get_display_name(message.guild.id, message.author) text = f"{name}: {text}" await enqueue(message.guild, message.author, text, voice_id) @bot.event async def on_voice_state_update(member: discord.Member, before, after): if member.bot: return vc = voice_clients.get(member.guild.id) if vc and vc.is_connected(): if all(m.bot for m in vc.channel.members): await vc.disconnect() voice_clients.pop(member.guild.id, None) last_activity.pop(member.guild.id, None) logger.info(f"Auto-disconnected from {member.guild.name}") @tasks.loop(seconds=30) async def check_inactivity(): now = datetime.now() timeout = timedelta(seconds=config["bot"]["inactivity_timeout"]) stale = [gid for gid, t in last_activity.items() if now - t > timeout] for gid in stale: vc = voice_clients.get(gid) if vc: try: await vc.disconnect() except Exception: pass voice_clients.pop(gid, None) last_activity.pop(gid, None) logger.info(f"Inactivity disconnect: {gid}") def admin_only(): return app_commands.checks.has_permissions(manage_guild=True) @bot.tree.command( name="setup-channel", description="Add or remove a TTS text channel (admin)" ) @app_commands.describe(channel="Text channel", action="add or remove") @app_commands.choices( action=[ app_commands.Choice(name="add", value="add"), app_commands.Choice(name="remove", value="remove"), ] ) @admin_only() async def setup_channel( interaction: discord.Interaction, channel: discord.TextChannel, action: str ): gs = get_guild(interaction.guild_id) ids: list = gs.setdefault("tts_channels", []) if action == "add": if channel.id in ids: await interaction.response.send_message( f"#{channel.name} is already a TTS channel.", ephemeral=True ) return ids.append(channel.id) save_guild_settings() await interaction.response.send_message( f"#{channel.name} set as TTS channel.", ephemeral=True ) else: if channel.id not in ids: await interaction.response.send_message( f"#{channel.name} is not a TTS channel.", ephemeral=True ) return ids.remove(channel.id) save_guild_settings() await interaction.response.send_message( f"#{channel.name} removed from TTS channels.", ephemeral=True ) @bot.tree.command( name="setup-vc", description="Bind or unbind the bot to a voice channel (admin)" ) @app_commands.describe(channel="Voice channel to bind to", action="bind or unbind") @app_commands.choices( action=[ app_commands.Choice(name="bind", value="bind"), app_commands.Choice(name="unbind", value="unbind"), ] ) @admin_only() async def setup_vc( interaction: discord.Interaction, action: str, channel: discord.VoiceChannel = None ): gs = get_guild(interaction.guild_id) if action == "bind": if channel is None: await interaction.response.send_message( "Provide a voice channel to bind to.", ephemeral=True ) return gs["bound_vc"] = channel.id save_guild_settings() await interaction.response.send_message( f"Bot will always join {channel.name} for TTS.", ephemeral=True ) else: gs["bound_vc"] = None save_guild_settings() await interaction.response.send_message( "VC binding removed. Bot will follow the user's VC.", ephemeral=True ) @bot.tree.command(name="setup-status", description="Show current TTS config (admin)") @admin_only() async def setup_status(interaction: discord.Interaction): gs = get_guild(interaction.guild_id) tts_channels = gs.get("tts_channels", []) bound_vc = gs.get("bound_vc") channel_mentions = [f"<#{c}>" for c in tts_channels] or ["none"] vc_mention = f"<#{bound_vc}>" if bound_vc else "none (follows user)" embed = discord.Embed(title="TTS Config", color=discord.Color.blurple()) embed.add_field( name="TTS Channels", value="\n".join(channel_mentions), inline=False ) embed.add_field(name="Bound VC", value=vc_mention, inline=False) await interaction.response.send_message(embed=embed, ephemeral=True) @bot.tree.command(name="voice", description="Set your TTS voice") @app_commands.describe(language="Language", voice="Voice name") async def voice_cmd(interaction: discord.Interaction, language: str, voice: str): if language not in voices_data["voices"]: langs = ", ".join(voices_data["voices"].keys()) await interaction.response.send_message( f"Unknown language. Options: {langs}", ephemeral=True ) return v, lang = find_voice(voice) if v is None: available = ", ".join(x["name"] for x in voices_data["voices"][language]) await interaction.response.send_message( f"Unknown voice. Options for {language}: {available}", ephemeral=True ) return set_user_voice(interaction.guild_id, interaction.user.id, v["id"]) await interaction.response.send_message( f"Voice set to {v['name']} ({lang})", ephemeral=True ) @voice_cmd.autocomplete("language") async def autocomplete_language(interaction: discord.Interaction, current: str): return [ app_commands.Choice(name=lang, value=lang) for lang in voices_data["voices"] if current.lower() in lang.lower() ][:25] @voice_cmd.autocomplete("voice") async def autocomplete_voice(interaction: discord.Interaction, current: str): language = next( ( o["value"] for o in interaction.data.get("options", []) if o["name"] == "language" ), None, ) if not language or language not in voices_data["voices"]: return [] return [ app_commands.Choice(name=v["name"], value=v["name"]) for v in voices_data["voices"][language] if current.lower() in v["name"].lower() ][:25] @bot.tree.command(name="stop", description="Stop TTS and disconnect") async def stop(interaction: discord.Interaction): gid = interaction.guild_id vc = voice_clients.get(gid) if not vc or not vc.is_connected(): await interaction.response.send_message( "Not in a voice channel.", ephemeral=True ) return if vc.is_playing(): vc.stop() q = tts_queues.get(gid) if q: while not q.empty(): try: q.get_nowait() q.task_done() except Exception: break await vc.disconnect() voice_clients.pop(gid, None) last_activity.pop(gid, None) await interaction.response.send_message("Stopped and disconnected.", ephemeral=True) @bot.tree.command( name="tts-enable", description="Enable TTS for your messages in this server" ) async def tts_enable(interaction: discord.Interaction): gs = get_guild(interaction.guild_id) if not gs.get("tts_channels"): await interaction.response.send_message( "No TTS channels have been configured for this server yet.", ephemeral=True ) return enabled = tts_enabled.setdefault(interaction.guild_id, set()) if interaction.user.id in enabled: await interaction.response.send_message( "TTS is already enabled for you.", ephemeral=True ) return enabled.add(interaction.user.id) await interaction.response.send_message( "TTS enabled. Your messages in TTS channels will now be spoken.", ephemeral=True ) @bot.tree.command(name="tts-name", description="Set the name TTS reads out for you") @app_commands.describe( name="Name to be read out (leave blank to reset to your display name)" ) async def tts_name(interaction: discord.Interaction, name: str = None): names = user_display_names.setdefault(interaction.guild_id, {}) if name is None: names.pop(interaction.user.id, None) await interaction.response.send_message( "Name reset to your display name.", ephemeral=True ) else: names[interaction.user.id] = name await interaction.response.send_message( f'TTS will now call you "{name}".', ephemeral=True ) @bot.tree.command( name="tts-disable", description="Disable TTS for your messages in this server" ) async def tts_disable(interaction: discord.Interaction): enabled = tts_enabled.get(interaction.guild_id, set()) if interaction.user.id not in enabled: await interaction.response.send_message( "TTS is not enabled for you.", ephemeral=True ) return enabled.discard(interaction.user.id) await interaction.response.send_message("TTS disabled.", ephemeral=True) @bot.tree.command(name="join", description="Join your current voice channel") async def join(interaction: discord.Interaction): if not interaction.user.voice or not interaction.user.voice.channel: await interaction.response.send_message( "You are not in a voice channel.", ephemeral=True ) return target = interaction.user.voice.channel vc = voice_clients.get(interaction.guild_id) if vc and vc.is_connected(): if vc.channel.id == target.id: await interaction.response.send_message( "Already in your voice channel.", ephemeral=True ) return await vc.move_to(target) await interaction.response.send_message( f"Moved to {target.name}.", ephemeral=True ) return try: vc = await target.connect() voice_clients[interaction.guild_id] = vc last_activity[interaction.guild_id] = datetime.now() await interaction.response.send_message( f"Joined {target.name}.", ephemeral=True ) except Exception as e: logger.error(f"VC connect error: {e}") await interaction.response.send_message( f"Could not join {target.name}: {e}", ephemeral=True ) @bot.tree.command(name="skip", description="Skip the current TTS message") async def skip(interaction: discord.Interaction): vc = voice_clients.get(interaction.guild_id) if vc and vc.is_playing(): vc.stop() await interaction.response.send_message("Skipped.", ephemeral=True) else: await interaction.response.send_message("Nothing is playing.", ephemeral=True) if __name__ == "__main__": token = os.getenv("DISCORD_TOKEN") if not token: logger.error("DISCORD_TOKEN not set") exit(1) bot.run(token)