mirror of
https://github.com/System-End/tts.git
synced 2026-04-19 14:17:05 +00:00
624 lines
19 KiB
Python
624 lines
19 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import shutil
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
|
|
import discord
|
|
import edge_tts
|
|
from discord import app_commands
|
|
from discord.ext import commands, tasks
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
with open("config.json", "r", encoding="utf-8") as f:
|
|
config = json.load(f)
|
|
|
|
with open("voices.json", "r", encoding="utf-8") as f:
|
|
voices_data = json.load(f)
|
|
|
|
DATA_DIR = os.getenv("DATA_DIR", "/app/data")
|
|
Path(DATA_DIR).mkdir(parents=True, exist_ok=True)
|
|
|
|
SETTINGS_FILE = os.path.join(DATA_DIR, "guild_settings.json")
|
|
|
|
AUDIO_CACHE_DIR = os.path.join(DATA_DIR, "audio_cache")
|
|
Path
|
|
(AUDIO_CACHE_DIR).mkdir(parents=True, exist_ok=True)
|
|
|
|
_image_settings = "/app/guild_settings.json"
|
|
if not os.path.exists(SETTINGS_FILE) and os.path.exists(_image_settings):
|
|
try:
|
|
shutil.copy(_image_settings, SETTINGS_FILE)
|
|
logger.info("Migrated guild_settings.json into DATA_DIR")
|
|
except Exception as e:
|
|
logger.error(f"Failed to migrate settings: {e}")
|
|
|
|
|
|
def load_guild_settings() -> dict:
|
|
if os.path.exists(SETTINGS_FILE):
|
|
try:
|
|
with open(SETTINGS_FILE, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
logger.error(f"Failed to load settings file {SETTINGS_FILE}: {e}")
|
|
return {}
|
|
return {}
|
|
|
|
|
|
def save_guild_settings():
|
|
saveable = {}
|
|
for gid, data in guild_settings.items():
|
|
saveable[gid] = {
|
|
"tts_channels": data.get("tts_channels", []),
|
|
"bound_vc": data.get("bound_vc", None),
|
|
}
|
|
try:
|
|
with open(SETTINGS_FILE, "w", encoding="utf-8") as f:
|
|
json.dump(saveable, f, indent=2)
|
|
except Exception as e:
|
|
logger.error(f"Failed to write settings to {SETTINGS_FILE}: {e}")
|
|
|
|
|
|
guild_settings: dict = load_guild_settings()
|
|
|
|
voice_clients: dict[int, discord.VoiceClient] = {}
|
|
last_activity: dict[int, datetime] = {}
|
|
user_voices: dict[int, dict[int, str]] = {}
|
|
tts_queues: dict[int, asyncio.Queue] = {}
|
|
tts_enabled: dict[int, set[int]] = {}
|
|
last_speaker: dict[int, int] = {}
|
|
user_display_names: dict[int, dict[int, str]] = {}
|
|
|
|
intents = discord.Intents.default()
|
|
intents.message_content = True
|
|
intents.voice_states = True
|
|
bot = commands.Bot(command_prefix="\x00", intents=intents)
|
|
|
|
|
|
def get_guild(guild_id: int) -> dict:
|
|
key = str(guild_id)
|
|
if key not in guild_settings:
|
|
guild_settings[key] = {"tts_channels": [], "bound_vc": None}
|
|
return guild_settings[key]
|
|
|
|
|
|
def get_display_name(guild_id: int, member: discord.Member) -> str:
|
|
return user_display_names.get(guild_id, {}).get(member.id, member.display_name)
|
|
|
|
|
|
def get_user_voice(guild_id: int, user_id: int) -> str:
|
|
return user_voices.get(guild_id, {}).get(user_id, config["tts"]["default_voice"])
|
|
|
|
|
|
def set_user_voice(guild_id: int, user_id: int, voice_id: str):
|
|
user_voices.setdefault(guild_id, {})[user_id] = voice_id
|
|
|
|
|
|
def find_voice(name: str):
|
|
for lang, voices in voices_data["voices"].items():
|
|
for v in voices:
|
|
if v["name"].lower() == name.lower() or v["id"].lower() == name.lower():
|
|
return v, lang
|
|
return None, None
|
|
|
|
|
|
def ffmpeg_options() -> dict:
|
|
# Use the config audio settings; ffmpeg filter string can be adjusted as needed.
|
|
return {
|
|
"options": (
|
|
f"-vn -b:a {config['audio']['bitrate']}k "
|
|
f"-ar {config['audio']['sample_rate']} -ac 2 "
|
|
f'-af "loudnorm=I=-16:TP=-1.5:LRA=11,'
|
|
f'acompressor=threshold=-20dB:ratio=4:attack=5:release=50"'
|
|
),
|
|
}
|
|
|
|
|
|
async def generate_tts(text: str, voice_id: str, path: str) -> bool:
|
|
try:
|
|
comm = edge_tts.Communicate(
|
|
text, voice_id, rate=config["tts"]["rate"], pitch=config["tts"]["pitch"]
|
|
)
|
|
await comm.save(path)
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"TTS error: {e}")
|
|
return False
|
|
|
|
|
|
async def cleanup(path: str, delay: int = 8):
|
|
await asyncio.sleep(delay)
|
|
try:
|
|
if os.path.exists(path):
|
|
os.remove(path)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
async def ensure_connected(
|
|
guild: discord.Guild, member: discord.Member
|
|
) -> discord.VoiceClient | None:
|
|
gs = get_guild(guild.id)
|
|
bound = gs.get("bound_vc")
|
|
target = None
|
|
|
|
if bound:
|
|
target = discord.utils.get(guild.voice_channels, id=bound)
|
|
if target is None and member.voice and member.voice.channel:
|
|
target = member.voice.channel
|
|
else:
|
|
if not member.voice or not member.voice.channel:
|
|
return None
|
|
target = member.voice.channel
|
|
|
|
vc = voice_clients.get(guild.id)
|
|
if vc and vc.is_connected():
|
|
if vc.channel.id != target.id:
|
|
await vc.move_to(target)
|
|
return vc
|
|
|
|
try:
|
|
vc = await target.connect()
|
|
voice_clients[guild.id] = vc
|
|
return vc
|
|
except Exception as e:
|
|
logger.error(f"VC connect error: {e}")
|
|
return None
|
|
|
|
|
|
async def enqueue(
|
|
guild: discord.Guild, member: discord.Member, text: str, voice_id: str
|
|
):
|
|
q = tts_queues.setdefault(guild.id, asyncio.Queue())
|
|
await q.put((text, voice_id, member))
|
|
if q.qsize() == 1:
|
|
asyncio.create_task(queue_worker(guild))
|
|
|
|
|
|
async def queue_worker(guild: discord.Guild):
|
|
q = tts_queues.get(guild.id)
|
|
if not q:
|
|
return
|
|
|
|
while not q.empty():
|
|
text, voice_id, member = await q.get()
|
|
|
|
vc = await ensure_connected(guild, member)
|
|
if vc is None:
|
|
try:
|
|
await member.send(
|
|
"Could not join a voice channel. Make sure you are in a voice channel (or the bound VC if one is set)."
|
|
)
|
|
except Exception:
|
|
pass
|
|
q.task_done()
|
|
continue
|
|
|
|
last_activity[guild.id] = datetime.now()
|
|
|
|
ts = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
|
|
path = os.path.join(AUDIO_CACHE_DIR, f"{guild.id}_{ts}.mp3")
|
|
|
|
ok = await generate_tts(text, voice_id, path)
|
|
|
|
if not ok:
|
|
try:
|
|
await member.send(
|
|
"Failed to generate TTS audio. The TTS service may be unavailable."
|
|
)
|
|
except Exception:
|
|
pass
|
|
q.task_done()
|
|
continue
|
|
|
|
while vc.is_playing():
|
|
await asyncio.sleep(0.1)
|
|
|
|
try:
|
|
source = discord.FFmpegPCMAudio(path, **ffmpeg_options())
|
|
vc.play(source)
|
|
asyncio.create_task(cleanup(path, delay=12))
|
|
except Exception as e:
|
|
logger.error(f"Playback error: {e}")
|
|
try:
|
|
await member.send(
|
|
"Failed to play audio. Check that ffmpeg is installed."
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
q.task_done()
|
|
|
|
|
|
@bot.event
|
|
async def on_ready():
|
|
logger.info(f"Logged in as {bot.user}")
|
|
check_inactivity.start()
|
|
try:
|
|
synced = await bot.tree.sync()
|
|
logger.info(f"Synced {len(synced)} commands")
|
|
except Exception as e:
|
|
logger.error(f"Sync error: {e}")
|
|
|
|
|
|
@bot.event
|
|
async def on_message(message: discord.Message):
|
|
if message.author.bot or not message.guild:
|
|
return
|
|
|
|
gs = get_guild(message.guild.id)
|
|
|
|
if message.channel.id not in gs.get("tts_channels", []):
|
|
return
|
|
|
|
text = message.content.strip()
|
|
if not text or text.startswith("/"):
|
|
return
|
|
|
|
enabled = tts_enabled.get(message.guild.id, set())
|
|
if message.author.id not in enabled:
|
|
return
|
|
|
|
if not message.author.voice or not message.author.voice.channel:
|
|
try:
|
|
await message.reply(
|
|
"You need to be in a voice channel.", mention_author=False
|
|
)
|
|
except Exception:
|
|
pass
|
|
return
|
|
|
|
if len(text) > config["tts"]["max_length"]:
|
|
text = text[: config["tts"]["max_length"]]
|
|
|
|
voice_id = get_user_voice(message.guild.id, message.author.id)
|
|
|
|
prev = last_speaker.get(message.guild.id)
|
|
last_speaker[message.guild.id] = message.author.id
|
|
if prev != message.author.id:
|
|
name = get_display_name(message.guild.id, message.author)
|
|
text = f"{name}: {text}"
|
|
|
|
await enqueue(message.guild, message.author, text, voice_id)
|
|
|
|
|
|
@bot.event
|
|
async def on_voice_state_update(member: discord.Member, before, after):
|
|
if member.bot:
|
|
return
|
|
|
|
vc = voice_clients.get(member.guild.id)
|
|
if vc and vc.is_connected():
|
|
if all(m.bot for m in vc.channel.members):
|
|
try:
|
|
await vc.disconnect()
|
|
except Exception:
|
|
pass
|
|
voice_clients.pop(member.guild.id, None)
|
|
last_activity.pop(member.guild.id, None)
|
|
logger.info(f"Auto-disconnected from {member.guild.name}")
|
|
|
|
|
|
@tasks.loop(seconds=30)
|
|
async def check_inactivity():
|
|
now = datetime.now()
|
|
timeout = timedelta(seconds=config["bot"]["inactivity_timeout"])
|
|
stale = [gid for gid, t in last_activity.items() if now - t > timeout]
|
|
|
|
for gid in stale:
|
|
vc = voice_clients.get(gid)
|
|
if vc:
|
|
try:
|
|
await vc.disconnect()
|
|
except Exception:
|
|
pass
|
|
voice_clients.pop(gid, None)
|
|
last_activity.pop(gid, None)
|
|
logger.info(f"Inactivity disconnect: {gid}")
|
|
|
|
|
|
def admin_only():
|
|
return app_commands.checks.has_permissions(manage_guild=True)
|
|
|
|
|
|
@bot.tree.command(
|
|
name="setup-channel", description="Add or remove a TTS text channel (admin)"
|
|
)
|
|
@app_commands.describe(channel="Text channel", action="add or remove")
|
|
@app_commands.choices(
|
|
action=[
|
|
app_commands.Choice(name="add", value="add"),
|
|
app_commands.Choice(name="remove", value="remove"),
|
|
]
|
|
)
|
|
@admin_only()
|
|
async def setup_channel(
|
|
interaction: discord.Interaction, channel: discord.TextChannel, action: str
|
|
):
|
|
gs = get_guild(interaction.guild_id)
|
|
ids: list = gs.setdefault("tts_channels", [])
|
|
|
|
if action == "add":
|
|
if channel.id in ids:
|
|
await interaction.response.send_message(
|
|
f"#{channel.name} is already a TTS channel.", ephemeral=True
|
|
)
|
|
return
|
|
ids.append(channel.id)
|
|
save_guild_settings()
|
|
await interaction.response.send_message(
|
|
f"#{channel.name} set as TTS channel.", ephemeral=True
|
|
)
|
|
else:
|
|
if channel.id not in ids:
|
|
await interaction.response.send_message(
|
|
f"#{channel.name} is not a TTS channel.", ephemeral=True
|
|
)
|
|
return
|
|
ids.remove(channel.id)
|
|
save_guild_settings()
|
|
await interaction.response.send_message(
|
|
f"#{channel.name} removed from TTS channels.", ephemeral=True
|
|
)
|
|
|
|
|
|
@bot.tree.command(
|
|
name="setup-vc", description="Bind or unbind the bot to a voice channel (admin)"
|
|
)
|
|
@app_commands.describe(channel="Voice channel to bind to", action="bind or unbind")
|
|
@app_commands.choices(
|
|
action=[
|
|
app_commands.Choice(name="bind", value="bind"),
|
|
app_commands.Choice(name="unbind", value="unbind"),
|
|
]
|
|
)
|
|
@admin_only()
|
|
async def setup_vc(
|
|
interaction: discord.Interaction, action: str, channel: discord.VoiceChannel = None
|
|
):
|
|
gs = get_guild(interaction.guild_id)
|
|
|
|
if action == "bind":
|
|
if channel is None:
|
|
await interaction.response.send_message(
|
|
"Provide a voice channel to bind to.", ephemeral=True
|
|
)
|
|
return
|
|
gs["bound_vc"] = channel.id
|
|
save_guild_settings()
|
|
await interaction.response.send_message(
|
|
f"Bot will always join {channel.name} for TTS.", ephemeral=True
|
|
)
|
|
else:
|
|
gs["bound_vc"] = None
|
|
save_guild_settings()
|
|
await interaction.response.send_message(
|
|
"VC binding removed. Bot will follow the user's VC.", ephemeral=True
|
|
)
|
|
|
|
|
|
@bot.tree.command(name="setup-status", description="Show current TTS config (admin)")
|
|
@admin_only()
|
|
async def setup_status(interaction: discord.Interaction):
|
|
gs = get_guild(interaction.guild_id)
|
|
|
|
tts_channels = gs.get("tts_channels", [])
|
|
bound_vc = gs.get("bound_vc")
|
|
|
|
channel_mentions = [f"<#{c}>" for c in tts_channels] or ["none"]
|
|
vc_mention = f"<#{bound_vc}>" if bound_vc else "none (follows user)"
|
|
|
|
embed = discord.Embed(title="TTS Config", color=discord.Color.blurple())
|
|
embed.add_field(
|
|
name="TTS Channels", value="\n".join(channel_mentions), inline=False
|
|
)
|
|
embed.add_field(name="Bound VC", value=vc_mention, inline=False)
|
|
|
|
await interaction.response.send_message(embed=embed, ephemeral=True)
|
|
|
|
|
|
@bot.tree.command(name="voice", description="Set your TTS voice")
|
|
@app_commands.describe(language="Language", voice="Voice name")
|
|
async def voice_cmd(interaction: discord.Interaction, language: str, voice: str):
|
|
if language not in voices_data["voices"]:
|
|
langs = ", ".join(voices_data["voices"].keys())
|
|
await interaction.response.send_message(
|
|
f"Unknown language. Options: {langs}", ephemeral=True
|
|
)
|
|
return
|
|
|
|
v, lang = find_voice(voice)
|
|
if v is None:
|
|
available = ", ".join(x["name"] for x in voices_data["voices"][language])
|
|
await interaction.response.send_message(
|
|
f"Unknown voice. Options for {language}: {available}", ephemeral=True
|
|
)
|
|
return
|
|
|
|
set_user_voice(interaction.guild_id, interaction.user.id, v["id"])
|
|
await interaction.response.send_message(
|
|
f"Voice set to {v['name']} ({lang})", ephemeral=True
|
|
)
|
|
|
|
|
|
@voice_cmd.autocomplete("language")
|
|
async def autocomplete_language(interaction: discord.Interaction, current: str):
|
|
return [
|
|
app_commands.Choice(name=lang, value=lang)
|
|
for lang in voices_data["voices"]
|
|
if current.lower() in lang.lower()
|
|
][:25]
|
|
|
|
|
|
@voice_cmd.autocomplete("voice")
|
|
async def autocomplete_voice(interaction: discord.Interaction, current: str):
|
|
language = next(
|
|
(
|
|
o["value"]
|
|
for o in interaction.data.get("options", [])
|
|
if o["name"] == "language"
|
|
),
|
|
None,
|
|
)
|
|
if not language or language not in voices_data["voices"]:
|
|
return []
|
|
return [
|
|
app_commands.Choice(name=v["name"], value=v["name"])
|
|
for v in voices_data["voices"][language]
|
|
if current.lower() in v["name"].lower()
|
|
][:25]
|
|
|
|
|
|
@bot.tree.command(name="stop", description="Stop TTS and disconnect")
|
|
async def stop(interaction: discord.Interaction):
|
|
gid = interaction.guild_id
|
|
vc = voice_clients.get(gid)
|
|
|
|
if not vc or not vc.is_connected():
|
|
await interaction.response.send_message(
|
|
"Not in a voice channel.", ephemeral=True
|
|
)
|
|
return
|
|
|
|
if vc.is_playing():
|
|
vc.stop()
|
|
|
|
q = tts_queues.get(gid)
|
|
if q:
|
|
while not q.empty():
|
|
try:
|
|
q.get_nowait()
|
|
q.task_done()
|
|
except Exception:
|
|
break
|
|
|
|
try:
|
|
await vc.disconnect()
|
|
except Exception:
|
|
pass
|
|
voice_clients.pop(gid, None)
|
|
last_activity.pop(gid, None)
|
|
|
|
await interaction.response.send_message("Stopped and disconnected.", ephemeral=True)
|
|
|
|
|
|
@bot.tree.command(
|
|
name="tts-enable", description="Enable TTS for your messages in this server"
|
|
)
|
|
async def tts_enable(interaction: discord.Interaction):
|
|
gs = get_guild(interaction.guild_id)
|
|
if not gs.get("tts_channels"):
|
|
await interaction.response.send_message(
|
|
"No TTS channels have been configured for this server yet.", ephemeral=True
|
|
)
|
|
return
|
|
|
|
enabled = tts_enabled.setdefault(interaction.guild_id, set())
|
|
if interaction.user.id in enabled:
|
|
await interaction.response.send_message(
|
|
"TTS is already enabled for you.", ephemeral=True
|
|
)
|
|
return
|
|
|
|
enabled.add(interaction.user.id)
|
|
await interaction.response.send_message(
|
|
"TTS enabled. Your messages in TTS channels will now be spoken.", ephemeral=True
|
|
)
|
|
|
|
|
|
@bot.tree.command(name="tts-name", description="Set the name TTS reads out for you")
|
|
@app_commands.describe(
|
|
name="Name to be read out (leave blank to reset to your display name)"
|
|
)
|
|
async def tts_name(interaction: discord.Interaction, name: str = None):
|
|
names = user_display_names.setdefault(interaction.guild_id, {})
|
|
if name is None:
|
|
names.pop(interaction.user.id, None)
|
|
await interaction.response.send_message(
|
|
"Name reset to your display name.", ephemeral=True
|
|
)
|
|
else:
|
|
names[interaction.user.id] = name
|
|
await interaction.response.send_message(
|
|
f'TTS will now call you "{name}".', ephemeral=True
|
|
)
|
|
|
|
|
|
@bot.tree.command(
|
|
name="tts-disable", description="Disable TTS for your messages in this server"
|
|
)
|
|
async def tts_disable(interaction: discord.Interaction):
|
|
enabled = tts_enabled.get(interaction.guild_id, set())
|
|
if interaction.user.id not in enabled:
|
|
await interaction.response.send_message(
|
|
"TTS is not enabled for you.", ephemeral=True
|
|
)
|
|
return
|
|
|
|
enabled.discard(interaction.user.id)
|
|
await interaction.response.send_message("TTS disabled.", ephemeral=True)
|
|
|
|
|
|
@bot.tree.command(name="join", description="Join your current voice channel")
|
|
async def join(interaction: discord.Interaction):
|
|
if not interaction.user.voice or not interaction.user.voice.channel:
|
|
await interaction.response.send_message(
|
|
"You are not in a voice channel.", ephemeral=True
|
|
)
|
|
return
|
|
|
|
target = interaction.user.voice.channel
|
|
vc = voice_clients.get(interaction.guild_id)
|
|
|
|
if vc and vc.is_connected():
|
|
if vc.channel.id == target.id:
|
|
await interaction.response.send_message(
|
|
"Already in your voice channel.", ephemeral=True
|
|
)
|
|
return
|
|
await vc.move_to(target)
|
|
await interaction.response.send_message(
|
|
f"Moved to {target.name}.", ephemeral=True
|
|
)
|
|
return
|
|
|
|
try:
|
|
vc = await target.connect()
|
|
voice_clients[interaction.guild_id] = vc
|
|
last_activity[interaction.guild_id] = datetime.now()
|
|
await interaction.response.send_message(
|
|
f"Joined {target.name}.", ephemeral=True
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"VC connect error: {e}")
|
|
await interaction.response.send_message(
|
|
f"Could not join {target.name}: {e}", ephemeral=True
|
|
)
|
|
|
|
|
|
@bot.tree.command(name="skip", description="Skip the current TTS message")
|
|
async def skip(interaction: discord.Interaction):
|
|
vc = voice_clients.get(interaction.guild_id)
|
|
if vc and vc.is_playing():
|
|
vc.stop()
|
|
await interaction.response.send_message("Skipped.", ephemeral=True)
|
|
else:
|
|
await interaction.response.send_message("Nothing is playing.", ephemeral=True)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
token = os.getenv("DISCORD_TOKEN")
|
|
if not token:
|
|
logger.error("DISCORD_TOKEN not set")
|
|
exit(1)
|
|
bot.run(token)
|