tts/bot.py
2026-03-19 08:44:23 -07:00

624 lines
19 KiB
Python

import asyncio
import json
import logging
import os
import shutil
from datetime import datetime, timedelta
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()
import discord
import edge_tts
from discord import app_commands
from discord.ext import commands, tasks
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
with open("config.json", "r", encoding="utf-8") as f:
config = json.load(f)
with open("voices.json", "r", encoding="utf-8") as f:
voices_data = json.load(f)
DATA_DIR = os.getenv("DATA_DIR", "/app/data")
Path(DATA_DIR).mkdir(parents=True, exist_ok=True)
SETTINGS_FILE = os.path.join(DATA_DIR, "guild_settings.json")
AUDIO_CACHE_DIR = os.path.join(DATA_DIR, "audio_cache")
Path
(AUDIO_CACHE_DIR).mkdir(parents=True, exist_ok=True)
_image_settings = "/app/guild_settings.json"
if not os.path.exists(SETTINGS_FILE) and os.path.exists(_image_settings):
try:
shutil.copy(_image_settings, SETTINGS_FILE)
logger.info("Migrated guild_settings.json into DATA_DIR")
except Exception as e:
logger.error(f"Failed to migrate settings: {e}")
def load_guild_settings() -> dict:
if os.path.exists(SETTINGS_FILE):
try:
with open(SETTINGS_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
logger.error(f"Failed to load settings file {SETTINGS_FILE}: {e}")
return {}
return {}
def save_guild_settings():
saveable = {}
for gid, data in guild_settings.items():
saveable[gid] = {
"tts_channels": data.get("tts_channels", []),
"bound_vc": data.get("bound_vc", None),
}
try:
with open(SETTINGS_FILE, "w", encoding="utf-8") as f:
json.dump(saveable, f, indent=2)
except Exception as e:
logger.error(f"Failed to write settings to {SETTINGS_FILE}: {e}")
guild_settings: dict = load_guild_settings()
voice_clients: dict[int, discord.VoiceClient] = {}
last_activity: dict[int, datetime] = {}
user_voices: dict[int, dict[int, str]] = {}
tts_queues: dict[int, asyncio.Queue] = {}
tts_enabled: dict[int, set[int]] = {}
last_speaker: dict[int, int] = {}
user_display_names: dict[int, dict[int, str]] = {}
intents = discord.Intents.default()
intents.message_content = True
intents.voice_states = True
bot = commands.Bot(command_prefix="\x00", intents=intents)
def get_guild(guild_id: int) -> dict:
key = str(guild_id)
if key not in guild_settings:
guild_settings[key] = {"tts_channels": [], "bound_vc": None}
return guild_settings[key]
def get_display_name(guild_id: int, member: discord.Member) -> str:
return user_display_names.get(guild_id, {}).get(member.id, member.display_name)
def get_user_voice(guild_id: int, user_id: int) -> str:
return user_voices.get(guild_id, {}).get(user_id, config["tts"]["default_voice"])
def set_user_voice(guild_id: int, user_id: int, voice_id: str):
user_voices.setdefault(guild_id, {})[user_id] = voice_id
def find_voice(name: str):
for lang, voices in voices_data["voices"].items():
for v in voices:
if v["name"].lower() == name.lower() or v["id"].lower() == name.lower():
return v, lang
return None, None
def ffmpeg_options() -> dict:
# Use the config audio settings; ffmpeg filter string can be adjusted as needed.
return {
"options": (
f"-vn -b:a {config['audio']['bitrate']}k "
f"-ar {config['audio']['sample_rate']} -ac 2 "
f'-af "loudnorm=I=-16:TP=-1.5:LRA=11,'
f'acompressor=threshold=-20dB:ratio=4:attack=5:release=50"'
),
}
async def generate_tts(text: str, voice_id: str, path: str) -> bool:
try:
comm = edge_tts.Communicate(
text, voice_id, rate=config["tts"]["rate"], pitch=config["tts"]["pitch"]
)
await comm.save(path)
return True
except Exception as e:
logger.error(f"TTS error: {e}")
return False
async def cleanup(path: str, delay: int = 8):
await asyncio.sleep(delay)
try:
if os.path.exists(path):
os.remove(path)
except Exception:
pass
async def ensure_connected(
guild: discord.Guild, member: discord.Member
) -> discord.VoiceClient | None:
gs = get_guild(guild.id)
bound = gs.get("bound_vc")
target = None
if bound:
target = discord.utils.get(guild.voice_channels, id=bound)
if target is None and member.voice and member.voice.channel:
target = member.voice.channel
else:
if not member.voice or not member.voice.channel:
return None
target = member.voice.channel
vc = voice_clients.get(guild.id)
if vc and vc.is_connected():
if vc.channel.id != target.id:
await vc.move_to(target)
return vc
try:
vc = await target.connect()
voice_clients[guild.id] = vc
return vc
except Exception as e:
logger.error(f"VC connect error: {e}")
return None
async def enqueue(
guild: discord.Guild, member: discord.Member, text: str, voice_id: str
):
q = tts_queues.setdefault(guild.id, asyncio.Queue())
await q.put((text, voice_id, member))
if q.qsize() == 1:
asyncio.create_task(queue_worker(guild))
async def queue_worker(guild: discord.Guild):
q = tts_queues.get(guild.id)
if not q:
return
while not q.empty():
text, voice_id, member = await q.get()
vc = await ensure_connected(guild, member)
if vc is None:
try:
await member.send(
"Could not join a voice channel. Make sure you are in a voice channel (or the bound VC if one is set)."
)
except Exception:
pass
q.task_done()
continue
last_activity[guild.id] = datetime.now()
ts = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
path = os.path.join(AUDIO_CACHE_DIR, f"{guild.id}_{ts}.mp3")
ok = await generate_tts(text, voice_id, path)
if not ok:
try:
await member.send(
"Failed to generate TTS audio. The TTS service may be unavailable."
)
except Exception:
pass
q.task_done()
continue
while vc.is_playing():
await asyncio.sleep(0.1)
try:
source = discord.FFmpegPCMAudio(path, **ffmpeg_options())
vc.play(source)
asyncio.create_task(cleanup(path, delay=12))
except Exception as e:
logger.error(f"Playback error: {e}")
try:
await member.send(
"Failed to play audio. Check that ffmpeg is installed."
)
except Exception:
pass
q.task_done()
@bot.event
async def on_ready():
logger.info(f"Logged in as {bot.user}")
check_inactivity.start()
try:
synced = await bot.tree.sync()
logger.info(f"Synced {len(synced)} commands")
except Exception as e:
logger.error(f"Sync error: {e}")
@bot.event
async def on_message(message: discord.Message):
if message.author.bot or not message.guild:
return
gs = get_guild(message.guild.id)
if message.channel.id not in gs.get("tts_channels", []):
return
text = message.content.strip()
if not text or text.startswith("/"):
return
enabled = tts_enabled.get(message.guild.id, set())
if message.author.id not in enabled:
return
if not message.author.voice or not message.author.voice.channel:
try:
await message.reply(
"You need to be in a voice channel.", mention_author=False
)
except Exception:
pass
return
if len(text) > config["tts"]["max_length"]:
text = text[: config["tts"]["max_length"]]
voice_id = get_user_voice(message.guild.id, message.author.id)
prev = last_speaker.get(message.guild.id)
last_speaker[message.guild.id] = message.author.id
if prev != message.author.id:
name = get_display_name(message.guild.id, message.author)
text = f"{name}: {text}"
await enqueue(message.guild, message.author, text, voice_id)
@bot.event
async def on_voice_state_update(member: discord.Member, before, after):
if member.bot:
return
vc = voice_clients.get(member.guild.id)
if vc and vc.is_connected():
if all(m.bot for m in vc.channel.members):
try:
await vc.disconnect()
except Exception:
pass
voice_clients.pop(member.guild.id, None)
last_activity.pop(member.guild.id, None)
logger.info(f"Auto-disconnected from {member.guild.name}")
@tasks.loop(seconds=30)
async def check_inactivity():
now = datetime.now()
timeout = timedelta(seconds=config["bot"]["inactivity_timeout"])
stale = [gid for gid, t in last_activity.items() if now - t > timeout]
for gid in stale:
vc = voice_clients.get(gid)
if vc:
try:
await vc.disconnect()
except Exception:
pass
voice_clients.pop(gid, None)
last_activity.pop(gid, None)
logger.info(f"Inactivity disconnect: {gid}")
def admin_only():
return app_commands.checks.has_permissions(manage_guild=True)
@bot.tree.command(
name="setup-channel", description="Add or remove a TTS text channel (admin)"
)
@app_commands.describe(channel="Text channel", action="add or remove")
@app_commands.choices(
action=[
app_commands.Choice(name="add", value="add"),
app_commands.Choice(name="remove", value="remove"),
]
)
@admin_only()
async def setup_channel(
interaction: discord.Interaction, channel: discord.TextChannel, action: str
):
gs = get_guild(interaction.guild_id)
ids: list = gs.setdefault("tts_channels", [])
if action == "add":
if channel.id in ids:
await interaction.response.send_message(
f"#{channel.name} is already a TTS channel.", ephemeral=True
)
return
ids.append(channel.id)
save_guild_settings()
await interaction.response.send_message(
f"#{channel.name} set as TTS channel.", ephemeral=True
)
else:
if channel.id not in ids:
await interaction.response.send_message(
f"#{channel.name} is not a TTS channel.", ephemeral=True
)
return
ids.remove(channel.id)
save_guild_settings()
await interaction.response.send_message(
f"#{channel.name} removed from TTS channels.", ephemeral=True
)
@bot.tree.command(
name="setup-vc", description="Bind or unbind the bot to a voice channel (admin)"
)
@app_commands.describe(channel="Voice channel to bind to", action="bind or unbind")
@app_commands.choices(
action=[
app_commands.Choice(name="bind", value="bind"),
app_commands.Choice(name="unbind", value="unbind"),
]
)
@admin_only()
async def setup_vc(
interaction: discord.Interaction, action: str, channel: discord.VoiceChannel = None
):
gs = get_guild(interaction.guild_id)
if action == "bind":
if channel is None:
await interaction.response.send_message(
"Provide a voice channel to bind to.", ephemeral=True
)
return
gs["bound_vc"] = channel.id
save_guild_settings()
await interaction.response.send_message(
f"Bot will always join {channel.name} for TTS.", ephemeral=True
)
else:
gs["bound_vc"] = None
save_guild_settings()
await interaction.response.send_message(
"VC binding removed. Bot will follow the user's VC.", ephemeral=True
)
@bot.tree.command(name="setup-status", description="Show current TTS config (admin)")
@admin_only()
async def setup_status(interaction: discord.Interaction):
gs = get_guild(interaction.guild_id)
tts_channels = gs.get("tts_channels", [])
bound_vc = gs.get("bound_vc")
channel_mentions = [f"<#{c}>" for c in tts_channels] or ["none"]
vc_mention = f"<#{bound_vc}>" if bound_vc else "none (follows user)"
embed = discord.Embed(title="TTS Config", color=discord.Color.blurple())
embed.add_field(
name="TTS Channels", value="\n".join(channel_mentions), inline=False
)
embed.add_field(name="Bound VC", value=vc_mention, inline=False)
await interaction.response.send_message(embed=embed, ephemeral=True)
@bot.tree.command(name="voice", description="Set your TTS voice")
@app_commands.describe(language="Language", voice="Voice name")
async def voice_cmd(interaction: discord.Interaction, language: str, voice: str):
if language not in voices_data["voices"]:
langs = ", ".join(voices_data["voices"].keys())
await interaction.response.send_message(
f"Unknown language. Options: {langs}", ephemeral=True
)
return
v, lang = find_voice(voice)
if v is None:
available = ", ".join(x["name"] for x in voices_data["voices"][language])
await interaction.response.send_message(
f"Unknown voice. Options for {language}: {available}", ephemeral=True
)
return
set_user_voice(interaction.guild_id, interaction.user.id, v["id"])
await interaction.response.send_message(
f"Voice set to {v['name']} ({lang})", ephemeral=True
)
@voice_cmd.autocomplete("language")
async def autocomplete_language(interaction: discord.Interaction, current: str):
return [
app_commands.Choice(name=lang, value=lang)
for lang in voices_data["voices"]
if current.lower() in lang.lower()
][:25]
@voice_cmd.autocomplete("voice")
async def autocomplete_voice(interaction: discord.Interaction, current: str):
language = next(
(
o["value"]
for o in interaction.data.get("options", [])
if o["name"] == "language"
),
None,
)
if not language or language not in voices_data["voices"]:
return []
return [
app_commands.Choice(name=v["name"], value=v["name"])
for v in voices_data["voices"][language]
if current.lower() in v["name"].lower()
][:25]
@bot.tree.command(name="stop", description="Stop TTS and disconnect")
async def stop(interaction: discord.Interaction):
gid = interaction.guild_id
vc = voice_clients.get(gid)
if not vc or not vc.is_connected():
await interaction.response.send_message(
"Not in a voice channel.", ephemeral=True
)
return
if vc.is_playing():
vc.stop()
q = tts_queues.get(gid)
if q:
while not q.empty():
try:
q.get_nowait()
q.task_done()
except Exception:
break
try:
await vc.disconnect()
except Exception:
pass
voice_clients.pop(gid, None)
last_activity.pop(gid, None)
await interaction.response.send_message("Stopped and disconnected.", ephemeral=True)
@bot.tree.command(
name="tts-enable", description="Enable TTS for your messages in this server"
)
async def tts_enable(interaction: discord.Interaction):
gs = get_guild(interaction.guild_id)
if not gs.get("tts_channels"):
await interaction.response.send_message(
"No TTS channels have been configured for this server yet.", ephemeral=True
)
return
enabled = tts_enabled.setdefault(interaction.guild_id, set())
if interaction.user.id in enabled:
await interaction.response.send_message(
"TTS is already enabled for you.", ephemeral=True
)
return
enabled.add(interaction.user.id)
await interaction.response.send_message(
"TTS enabled. Your messages in TTS channels will now be spoken.", ephemeral=True
)
@bot.tree.command(name="tts-name", description="Set the name TTS reads out for you")
@app_commands.describe(
name="Name to be read out (leave blank to reset to your display name)"
)
async def tts_name(interaction: discord.Interaction, name: str = None):
names = user_display_names.setdefault(interaction.guild_id, {})
if name is None:
names.pop(interaction.user.id, None)
await interaction.response.send_message(
"Name reset to your display name.", ephemeral=True
)
else:
names[interaction.user.id] = name
await interaction.response.send_message(
f'TTS will now call you "{name}".', ephemeral=True
)
@bot.tree.command(
name="tts-disable", description="Disable TTS for your messages in this server"
)
async def tts_disable(interaction: discord.Interaction):
enabled = tts_enabled.get(interaction.guild_id, set())
if interaction.user.id not in enabled:
await interaction.response.send_message(
"TTS is not enabled for you.", ephemeral=True
)
return
enabled.discard(interaction.user.id)
await interaction.response.send_message("TTS disabled.", ephemeral=True)
@bot.tree.command(name="join", description="Join your current voice channel")
async def join(interaction: discord.Interaction):
if not interaction.user.voice or not interaction.user.voice.channel:
await interaction.response.send_message(
"You are not in a voice channel.", ephemeral=True
)
return
target = interaction.user.voice.channel
vc = voice_clients.get(interaction.guild_id)
if vc and vc.is_connected():
if vc.channel.id == target.id:
await interaction.response.send_message(
"Already in your voice channel.", ephemeral=True
)
return
await vc.move_to(target)
await interaction.response.send_message(
f"Moved to {target.name}.", ephemeral=True
)
return
try:
vc = await target.connect()
voice_clients[interaction.guild_id] = vc
last_activity[interaction.guild_id] = datetime.now()
await interaction.response.send_message(
f"Joined {target.name}.", ephemeral=True
)
except Exception as e:
logger.error(f"VC connect error: {e}")
await interaction.response.send_message(
f"Could not join {target.name}: {e}", ephemeral=True
)
@bot.tree.command(name="skip", description="Skip the current TTS message")
async def skip(interaction: discord.Interaction):
vc = voice_clients.get(interaction.guild_id)
if vc and vc.is_playing():
vc.stop()
await interaction.response.send_message("Skipped.", ephemeral=True)
else:
await interaction.response.send_message("Nothing is playing.", ephemeral=True)
if __name__ == "__main__":
token = os.getenv("DISCORD_TOKEN")
if not token:
logger.error("DISCORD_TOKEN not set")
exit(1)
bot.run(token)