aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNathan Perry <np@nathanperry.dev>2024-08-08 14:53:04 -0400
committerNathan Perry <np@nathanperry.dev>2024-08-08 14:53:04 -0400
commit0659c3be4e996ca97a1a9d02142f2809507d2b38 (patch)
treea80381a8092cdb4587c0623921f355308408ede5
parent7ccdb63fb7f4008b7d785c687b4f55b6b4291483 (diff)
voice playback workselixir
-rw-r--r--config/config.exs6
-rw-r--r--config/runtime.exs2
-rw-r--r--lib/application.ex1
-rw-r--r--lib/audio/server.ex220
-rw-r--r--lib/audio/supervisor.ex15
-rw-r--r--lib/audio/util.ex114
-rw-r--r--lib/command/util.ex2
-rw-r--r--lib/meme.ex2
-rw-r--r--lib/util/stream.ex19
9 files changed, 349 insertions, 32 deletions
diff --git a/config/config.exs b/config/config.exs
index 206f883..d8bc01b 100644
--- a/config/config.exs
+++ b/config/config.exs
@@ -2,14 +2,18 @@ import Config
config :nostrum,
gateway_intents: [
+ :guilds,
:guild_messages,
:guild_message_reactions,
+ :guild_voice_states,
+ :direct_messages,
+ :direct_message_reactions,
:message_content
]
config :logger,
compile_time_purge_matching: [
- [application: :nostrum, level_lower_than: :warning]
+ # [application: :nostrum, level_lower_than: :warning]
]
config :logger, :console,
diff --git a/config/runtime.exs b/config/runtime.exs
index 0204d77..48511c4 100644
--- a/config/runtime.exs
+++ b/config/runtime.exs
@@ -14,3 +14,5 @@ if is_nil(Application.get_env(:nostrum, :token)) do
discord_token = System.get_env("DISCORD_TOKEN")
config :nostrum, token: discord_token
end
+
+config :nostrum, youtubedl: "yt-dlp"
diff --git a/lib/application.ex b/lib/application.ex
index 958ab7d..ae07c88 100644
--- a/lib/application.ex
+++ b/lib/application.ex
@@ -6,6 +6,7 @@ defmodule Thulani do
@children [
{Phoenix.PubSub, name: :thulani_pubsub},
{Registry, keys: :unique, name: Thulani.Audio.Registry},
+ Thulani.Audio.Supervisor,
Thulani.Repo,
Thulani.DiscordConsumer,
Nosedrum.TextCommand.Storage.ETS
diff --git a/lib/audio/server.ex b/lib/audio/server.ex
index 220f898..63ea7ea 100644
--- a/lib/audio/server.ex
+++ b/lib/audio/server.ex
@@ -1,55 +1,99 @@
+require Logger
+
alias Nostrum.Struct
-defmodule Audio.Server do
+alias Thulani.Audio.Util
+
+defmodule Thulani.Audio.Server do
use GenServer
@type which :: String.t()
- @type audio_ref :: {}
+ @type audio_ref :: Nostrum.Voice.play_input()
+
+ def start_link(guild_id) when is_integer(guild_id) do
+ GenServer.start_link(__MODULE__, guild_id,
+ name: {:via, Registry, {Thulani.Audio.Registry, guild_id}}
+ )
+ end
- def start_link(arg) do
- {:ok, _} = DynamicSupervisor.start_link(__MODULE__, arg, name: __MODULE__.Supervisor)
- {:ok, _} = Registry.start_link(keys: :unique, name: __MODULE__.Registry)
+ @spec play_now(Struct.Guild.id(), audio_ref, keyword()) :: which
+ def play_now(guild_id, channel, audio_ref, opts \\ []) do
+ dispatch(guild_id, {:now, {channel, audio_ref, opts}})
end
- @spec play_now(Struct.Guild.id(), audio_ref) :: which
- def play_now(guild, audio_ref) do
- {:ok, which} = GenServer.call(__MODULE__, {:now, guild, audio_ref})
- which
+ @spec enqueue(Struct.Guild.id(), audio_ref, keyword()) :: which
+ def enqueue(guild_id, channel, audio_ref, opts \\ []) do
+ dispatch(guild_id, {:enqueue, {channel, audio_ref, opts}})
end
- @spec enqueue(Struct.Guild.id(), audio_ref) :: which
- def enqueue(_guild, _audio_ref) do
- GenServer.call(__MODULE__, {:enqueue})
+ @spec cancel(Struct.Guild.id(), which) :: GenServer.call()
+ def cancel(guild_id, which) do
+ dispatch(guild_id, {:cancel, which})
end
- @spec cancel(which) :: GenServer.call()
- def cancel(which) do
- GenServer.call(__MODULE__, {:cancel, which})
+ @spec stop(Struct.Guild.id()) :: GenServer.call()
+ def stop(guild_id) do
+ dispatch(guild_id, :stop)
end
+ ####
+
@impl true
- def init(guild) do
- {:ok, {guild, :queue.new()}}
+ def init(guild_id) do
+ Nostrum.Voice.leave_channel(guild_id)
+ schedule_tick()
+
+ guild = Nostrum.Cache.GuildCache.get!(guild_id)
+ Logger.info("starting voice for guild '#{guild.name}' (#{guild_id})")
+ {:ok, {guild_id, :queue.new()}}
end
@impl true
- def handle_call({:now, {type, ref, opts}}, _from, {guild, q}) do
- if Nostrum.Voice.playing?(guild) do
- :ok = Nostrum.Voice.stop(guild)
- end
+ def handle_call({:now, {channel, ref, opts}}, _from, {guild, q}) do
+ guild_ = Nostrum.Cache.GuildCache.get!(guild)
+ channel_ = guild_.channels[channel]
- id = UUID.uuid4()
- Nostrum.Voice.play(guild, ref, type, opts)
+ {result, q} =
+ with :ok <- play(guild, channel, ref, opts) do
+ id = UUID.uuid4()
+ q = :queue.in_r({id, channel, ref, opts}, q)
- q = :queue.in_r({id, ref}, q)
- {:reply, :ok, {guild, q}}
+ Logger.info(
+ "started playback #{id} on '#{guild_.name}'/##{channel_.name}",
+ guild: guild,
+ channel: channel,
+ playback: id
+ )
+
+ {{:ok, id}, q}
+ else
+ e -> {e, q}
+ end
+
+ {:reply, result, {guild, q}}
end
@impl true
- def handle_call({:enqueue, audio_ref}, _from, {_guild, q}) do
- id = UUID.uuid4()
- q = :queue.in({id, audio_ref}, q)
- {:reply, {:ok, id}, {id, q}}
+ def handle_call({:enqueue, {channel, audio_ref, opts}}, from, {guild, q}) do
+ if :queue.is_empty(q) do
+ # start playback immediately if queue is empty
+ handle_call({:now, {channel, audio_ref, opts}}, from, {guild, q})
+ else
+ id = UUID.uuid4()
+ q = :queue.in({id, channel, audio_ref, opts}, q)
+
+ guild_ = Nostrum.Cache.GuildCache.get!(guild)
+ channel_ = guild_.channels[channel]
+
+ Logger.info(
+ "queued playback #{id} for '#{guild_.name}'/##{channel_.name}",
+ guild: guild,
+ channel: channel,
+ playback: id
+ )
+
+ {:reply, {:ok, id}, {guild, q}}
+ end
end
@impl true
@@ -57,4 +101,122 @@ defmodule Audio.Server do
q = :queue.delete_with(fn _elem -> false end, q)
{:reply, :ok, {guild, q}}
end
+
+ @impl true
+ def handle_call(:stop, _from, {guild, _q}) do
+ result = Nostrum.Voice.stop(guild)
+
+ {:reply, result, {guild, :queue.new()}}
+ end
+
+ @impl true
+ def handle_info(:tick, {guild, q} = state) do
+ state =
+ if Nostrum.Voice.playing?(guild) do
+ state
+ else
+ q =
+ if not :queue.is_empty(q) do
+ {{:value, {id, _channel, ref, opts}}, q} = :queue.out(q)
+ Logger.info("completed playback #{id}", id: id, ref: ref, opts: opts)
+
+ q
+ else
+ q
+ end
+
+ if :queue.is_empty(q) do
+ if not Util.disconnected?(guild) do
+ Logger.debug("no audio in queue, disconnecting from voice")
+
+ Util.disconnect(guild)
+ Logger.debug("waiting for disconnect")
+
+ if Util.wait_disconnected(guild) do
+ Logger.debug("disconnected")
+ else
+ Logger.debug("disconnect timed out")
+ end
+ end
+ else
+ {id, channel, ref, opts} = :queue.head(q)
+
+ Logger.debug("starting queued playback #{id}")
+ play(guild, channel, ref, opts)
+ end
+
+ {guild, q}
+ end
+
+ schedule_tick()
+ {:noreply, state}
+ end
+
+ ###
+
+ defp tick_interval, do: 100
+
+ defp schedule_tick(interval \\ tick_interval()) when is_integer(interval) do
+ Process.send_after(self(), :tick, tick_interval())
+ end
+
+ defp play(guild, channel, ref, opts) do
+ {:ok, args} = play_args(ref, opts)
+
+ if Nostrum.Voice.playing?(guild) do
+ Nostrum.Voice.stop(guild)
+ end
+
+ :ok = Nostrum.Voice.join_channel(guild, channel, false, true)
+
+ if Util.wait_ready(guild) do
+ Logger.debug("connected")
+ apply(Nostrum.Voice, :play, [guild | args])
+
+ Logger.debug("waiting for playing back")
+
+ if not Util.wait_playing(guild) do
+ {:error, :playback_timed_out}
+ else
+ Logger.debug("playback started")
+ :ok
+ end
+ else
+ {:error, :voice_unready}
+ end
+ end
+
+ defp play_args(ref, opts) do
+ with {:ok, type} <- Util.stream_type(ref) do
+ {:ok, [ref, type, opts]}
+ else
+ {:error, _} = err ->
+ err
+
+ otherwise ->
+ dbg(otherwise)
+ {:error, :unknown}
+ end
+ end
+
+ defp dispatch(guild_id, args) do
+ ensure(guild_id)
+ call(guild_id, args)
+ end
+
+ defp call(guild_id, args), do: GenServer.call(address(guild_id), args)
+ defp address(guild_id), do: {:via, Registry, {Thulani.Audio.Registry, guild_id}}
+
+ defp ensure(guild_id) do
+ case Registry.lookup(Thulani.Audio.Registry, guild_id) do
+ [{pid, _}] when is_pid(pid) ->
+ nil
+
+ [] ->
+ {:ok, _} = Thulani.Audio.Supervisor.start_child(guild_id)
+
+ otherwise ->
+ dbg(otherwise)
+ end
+ end
end
diff --git a/lib/audio/supervisor.ex b/lib/audio/supervisor.ex
index 0b86a1c..290dd95 100644
--- a/lib/audio/supervisor.ex
+++ b/lib/audio/supervisor.ex
@@ -1,2 +1,17 @@
defmodule Thulani.Audio.Supervisor do
+ use DynamicSupervisor
+ alias Thulani.Audio
+
+ def start_link(arg) do
+ DynamicSupervisor.start_link(__MODULE__, arg, name: __MODULE__)
+ end
+
+ def start_child(guild_id) do
+ DynamicSupervisor.start_child(__MODULE__, {Audio.Server, guild_id})
+ end
+
+ @impl true
+ def init(_arg) do
+ DynamicSupervisor.init(strategy: :one_for_one)
+ end
end
diff --git a/lib/audio/util.ex b/lib/audio/util.ex
new file mode 100644
index 0000000..f45af4e
--- /dev/null
+++ b/lib/audio/util.ex
@@ -0,0 +1,114 @@
+defmodule Thulani.Audio.Util do
+ @spec stream_type(Nostrum.Voice.play_input()) ::
+ {:ok, Nostrum.Voice.play_type()} | {:error, atom()}
+
+ def stream_type(ref) when is_binary(ref) do
+ cond do
+ ytdl_link?(ref) -> {:ok, :ytdl}
+ streamlink?(ref) -> {:ok, :stream}
+ valid_url?(ref) -> {:ok, :url}
+ true -> {:ok, :pipe}
+ end
+ end
+
+ def stream_type(_ref) do
+ {:error, :invalid}
+ end
+
+ @spec ytdl_link?(String.t()) :: boolean
+ def ytdl_link?(s) when is_binary(s) do
+ valid_url?(s) and
+ ytdl_checks()
+ |> any_check?(s)
+ end
+
+ def ytdl_link?(_s), do: false
+
+ @spec streamlink?(String.t()) :: boolean
+ def streamlink?(s) when is_binary(s) do
+ valid_url?(s) and streamlink_checks() |> any_check?(s)
+ end
+
+ def streamlink?(_s), do: false
+
+ def valid_url?(s), do: String.match?(s, ~r/^https:\/\/.*/i)
+
+ def ytdl_checks, do: [&youtube_link?/1, &vimeo_link?/1]
+ def streamlink_checks, do: [&twitch_link?/1]
+
+ def youtube_link?(s),
+ do: String.match?(s, youtube_regex())
+
+ def vimeo_link?(s),
+ do: String.match?(s, vimeo_regex())
+
+ def twitch_link?(s),
+ do: String.match?(s, twitch_regex())
+
+ def youtube_regex, do: ~r/^https?:\/\/(?:www\.)?(?:youtube\.com|youtu\.be)\/.*/i
+ def vimeo_regex, do: ~r/^https?:\/\/(?:www\.)?(?:vimeo\.com)\/.*/i
+ def twitch_regex, do: ~r/^https?:\/\/(?:www\.)?(?:twitch\.com)\/.*/i
+
+ defp any_check?(checks, s) do
+ checks
+ |> Stream.map(fn check -> check.(s) end)
+ |> Enum.reduce(false, fn acc, x -> acc or x end)
+ end
+
+ def wait_ready(guild, interval \\ 100, count \\ 10) do
+ wait_for(guild, interval, count, &Nostrum.Voice.ready?/1)
+ end
+
+ def wait_playing(guild, interval \\ 200, count \\ 30) do
+ wait_for(guild, interval, count, &Nostrum.Voice.playing?/1)
+ end
+
+ def wait_no_channel(guild, interval \\ 200, count \\ 30) do
+ wait_for(guild, interval, count, fn guild -> no_channel?(guild) end)
+ end
+
+ def wait_disconnected(guild, interval \\ 200, count \\ 30) do
+ wait_for(guild, interval, count, fn guild -> disconnected?(guild) end)
+ end
+
+ defp wait_for(guild, interval, count, check) do
+ unless check.(guild) do
+ [playing] =
+ Stream.unfold(0, fn i ->
+ Process.sleep(interval)
+ playing_state = check.(guild)
+ {playing_state, i + 1}
+ end)
+ |> Thulani.Util.Stream.take_until(&Function.identity/1)
+ |> Stream.take(count)
+ |> Enum.take(-1)
+
+ playing
+ else
+ true
+ end
+ end
+
+ def connected?(guild) do
+ voice_state = Nostrum.Voice.get_voice(guild)
+ !is_nil(voice_state) && !is_nil(voice_state.channel_id)
+ end
+
+ def disconnect(guild) do
+ voice_state = Nostrum.Voice.get_voice(guild)
+ Nostrum.Voice.leave_channel(guild)
+
+ if not is_nil(voice_state) do
+ Nostrum.Voice.Session.close_connection(voice_state.session_pid)
+ end
+ end
+
+ def disconnected?(guild) do
+ Nostrum.Voice.get_voice(guild) |> is_nil
+ end
+
+ def no_channel?(guild) do
+ voice_state = Nostrum.Voice.get_voice(guild)
+ !is_nil(voice_state) && is_nil(voice_state.channel_id)
+ end
+end
diff --git a/lib/command/util.ex b/lib/command/util.ex
index d47dd59..36447cc 100644
--- a/lib/command/util.ex
+++ b/lib/command/util.ex
@@ -13,7 +13,7 @@ defmodule Thulani.Command.Util do
},
msg
)
- when is_bitstring(msg) do
+ when is_binary(msg) do
Api.create_message(channel_id, content: msg)
end
diff --git a/lib/meme.ex b/lib/meme.ex
index 36048f8..caeccaf 100644
--- a/lib/meme.ex
+++ b/lib/meme.ex
@@ -36,7 +36,7 @@ defmodule Thulani.Meme do
defp require_nil(:text), do: dynamic([m], is_nil(m.content))
@spec by_name(String.t()) :: Meme
- def by_name(name) when is_bitstring(name) do
+ def by_name(name) when is_binary(name) do
from(m in Meme, where: m.title == ^name)
end
end
diff --git a/lib/util/stream.ex b/lib/util/stream.ex
new file mode 100644
index 0000000..b143bc1
--- /dev/null
+++ b/lib/util/stream.ex
@@ -0,0 +1,19 @@
+defmodule Thulani.Util.Stream do
+ def take_until(enum, fun) when :erlang.is_function(fun, 1) do
+ lazy(enum, fn f1 ->
+ fn entry, acc ->
+ if(fun.(entry)) do
+ {:cont, result} = f1.(entry, acc)
+ {:halt, result}
+ else
+ f1.(entry, acc)
+ end
+ end
+ end)
+ end
+
+ @compile {:inline, lazy: 2}
+
+ defp lazy(%Stream{done: nil, funs: funs} = lazy, fun), do: %{lazy | funs: [fun | funs]}
+ defp lazy(enum, fun), do: %Stream{enum: enum, funs: [fun]}
+end