aboutsummaryrefslogtreecommitdiff
path: root/downloader/download_manager.go
blob: 8f9e8a389b1918efedb76fb973f92dbf59c79999 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package downloader

import (
	"time"

	"github.com/bwmarrin/discordgo"
)

type DlMessage int

const (
	Clear DlMessage = iota
	Pause
	Play
)

type playBundle struct {
	data <-chan []byte
	conn *discordgo.VoiceConnection
}

type DownloadManager struct {
	session *discordgo.Session
	guildID string
	voiceID string
	dls     chan *downloader

	PlayState      chan DlMessage
	playStateChan  chan DlMessage
	proxyStateChan chan DlMessage

	proxyChan chan playBundle
}

const proxyBufSize = 512

func NewManager(s *discordgo.Session, guildID string, voiceChanID string) *DownloadManager {
	dm := &DownloadManager{
		session:   s,
		dls:       make(chan *downloader),
		PlayState: make(chan DlMessage),
		guildID:   guildID,
		voiceID:   voiceChanID,

		playStateChan: make(chan DlMessage),

		proxyStateChan: make(chan DlMessage),
		proxyChan:      make(chan playBundle),
	}

	go dm.teeStateMessages()
	go dm.proxyOpusPackets()
	go dm.playFromQueue()

	return dm
}

func (m *DownloadManager) teeStateMessages() {
	for msg := range m.PlayState {
		m.playStateChan <- msg
		m.proxyStateChan <- msg
	}
}

func (m *DownloadManager) proxyOpusPackets() {
loop:
	for bundle := range m.proxyChan {
		playState := Play
		bundle.conn.Speaking(true)

		cleanup := func() {
			bundle.conn.Speaking(false)
			bundle.conn.Disconnect()
		}

		for {
			switch playState {
			case Clear:
				for {
					select {
					case <-m.proxyChan:
					default:
					}
				}
				cleanup()
				continue loop
			case Pause:
				playState = <-m.proxyStateChan
			case Play:
				select { // first check if we have a state update message coming in
				case playState = <-m.proxyStateChan:
				case bundle.conn.OpusSend <- <-bundle.data:
				}
			}
		}
		cleanup()
	}
}

func (m *DownloadManager) playFromQueue() {
	for dl := range m.dls {
		ch, err := m.session.ChannelVoiceJoin(m.guildID, m.voiceID, false, false)
		if err != nil {
			log.Errorf("unable to connect to the voice channel: %q", err)
			time.Sleep(1 * time.Second)
			break
		}

		out, done := dl.Start()
		m.proxyChan <- playBundle{
			data: out,
			conn: ch,
		}
		<-done
	}
}

func (m *DownloadManager) Enqueue(url string, startTime, duration time.Duration) error {
	dl, err := newDownload(url, startTime, duration)
	if err != nil {
		return err
	}
	m.dls <- dl
	return nil
}