forked from nikivanov/watney
-
Notifications
You must be signed in to change notification settings - Fork 0
/
audiomanager.py
115 lines (98 loc) · 4.45 KB
/
audiomanager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject
import threading
import concurrent
import time
from events import Events
import uuid
PIPELINE = '''
rtpbin name=rtpbin latency=100 \
udpsrc port=50000 caps="application/x-rtp, media=audio, encoding-name=OPUS, clock-rate=48000" ! rtpbin.recv_rtp_sink_0 \
udpsrc port=50001 caps="application/x-rtcp" ! rtpbin.recv_rtcp_sink_0 \
rtpbin. ! rtpopusdepay ! queue ! opusdec ! audioresample ! audio/x-raw,format=S16LE,layout=interleaved,rate=16000,channels=1 ! webrtcechoprobe name=echoprobe ! alsasink \
alsasrc device=plughw:0,0 buffer-time=20000 ! audio/x-raw,format=S16LE,layout=interleaved,rate=16000,channels=1 ! webrtcdsp probe=echoprobe echo-suppression-level=high ! \
volume name=vol1 volume=10 ! volume name=vol2 volume=3 ! queue ! opusenc ! rtpopuspay ! udpsink host=127.0.0.1 port=8005
'''
class AudioManager:
def __init__(self, config):
self.pause = float(config["AUDIO"]["VolumeRestoreDelay"])
Events.getInstance().sessionStarted.append(lambda: self.onSessionStarted())
Events.getInstance().sessionEnded.append(lambda: self.onSessionEnded())
self.pipelineReady = False
self.volumeLock = threading.Lock()
self.sessionLock = threading.Lock()
self.sessionCounter = 0
self.delayedExecutor = concurrent.futures.ThreadPoolExecutor()
self.tokenSet = set()
self.unmuteId = None
def onSessionStarted(self):
with self.sessionLock:
if self.sessionCounter == 0:
self.shuttingDown = False
self.tokenSet.clear()
self.audioManagerThread = threading.Thread(name='audioManagerLoop', target=self.__runLoop)
self.audioManagerThread.setDaemon(True)
self.audioManagerThread.start()
self.sessionCounter = self.sessionCounter + 1
def onSessionEnded(self):
with self.sessionLock:
self.sessionCounter = self.sessionCounter - 1
if self.sessionCounter == 0:
self.shuttingDown = True
self.destroyPipeline()
self.audioManagerThread.join()
self.audioManagerThread = None
def __runLoop(self):
Gst.init(None)
while not self.shuttingDown:
print('Starting audio pipeline')
self.pipeline = Gst.parse_launch(PIPELINE)
bus = self.pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", self.on_message)
self.pipeline.set_state(Gst.State.PLAYING)
print('Audio pipeline started')
GObject.threads_init()
self.pipelineReady = True
self.mainloop = GObject.MainLoop().new(None, False)
self.mainloop.run()
self.pipelineReady = False
def on_message(self, bus, message):
t = message.type
if t == Gst.MessageType.EOS:
print('Audio pipeline EOS')
self.destroyPipeline()
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print('Audio pipeline error: {} {}'.format(err, debug))
self.destroyPipeline()
def destroyPipeline(self):
print("Stopping audio pipeline")
self.pipeline.get_bus().remove_signal_watch()
self.pipeline.set_state(Gst.State.NULL)
self.mainloop.quit()
def lowerVolume(self, token):
if not self.pipelineReady:
return
with self.volumeLock:
if len(self.tokenSet) == 0:
self.pipeline.get_by_name('vol1').set_property('volume', 1)
self.pipeline.get_by_name('vol2').set_property('volume', 1)
self.tokenSet.add(token)
def restoreVolume(self, token):
if not self.pipelineReady:
return
with self.volumeLock:
if token in self.tokenSet:
self.tokenSet.remove(token)
if len(self.tokenSet) == 0:
unmuteId = uuid.uuid4()
self.unmuteId = unmuteId
self.delayedExecutor.submit(self.__restoreVolume, unmuteId)
def __restoreVolume(self, unmuteId):
time.sleep(self.pause)
with self.volumeLock:
if len(self.tokenSet) == 0 and self.unmuteId == unmuteId:
self.pipeline.get_by_name('vol1').set_property('volume', 10)
self.pipeline.get_by_name('vol2').set_property('volume', 3)