Skip to content

Commit 1456f2a

Browse files
authored
Merge pull request #3187 from bastimeyer/plugins/twitch/filter-timeout
Add FilteredHLSStream
2 parents e5164d3 + 61a95a2 commit 1456f2a

File tree

3 files changed

+322
-6
lines changed

3 files changed

+322
-6
lines changed

src/streamlink/plugins/twitch.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717
from streamlink.stream import (
1818
HTTPStream, HLSStream, FLVPlaylist, extract_flv_header_tags
1919
)
20-
from streamlink.stream.hls import HLSStreamReader, HLSStreamWriter, HLSStreamWorker
20+
from streamlink.stream.hls import HLSStreamWorker
21+
from streamlink.stream.hls_filtered import FilteredHLSStreamWriter, FilteredHLSStreamReader
2122
from streamlink.stream.hls_playlist import M3U8Parser, load as load_hls_playlist
2223
from streamlink.utils.times import hours_minutes_seconds
2324

@@ -199,13 +200,12 @@ def process_sequences(self, playlist, sequences):
199200
return super(TwitchHLSStreamWorker, self).process_sequences(playlist, sequences)
200201

201202

202-
class TwitchHLSStreamWriter(HLSStreamWriter):
203-
def write(self, sequence, *args, **kwargs):
204-
if not (self.stream.disable_ads and sequence.segment.ad):
205-
return super(TwitchHLSStreamWriter, self).write(sequence, *args, **kwargs)
203+
class TwitchHLSStreamWriter(FilteredHLSStreamWriter):
204+
def should_filter_sequence(self, sequence):
205+
return self.stream.disable_ads and sequence.segment.ad
206206

207207

208-
class TwitchHLSStreamReader(HLSStreamReader):
208+
class TwitchHLSStreamReader(FilteredHLSStreamReader):
209209
__worker__ = TwitchHLSStreamWorker
210210
__writer__ = TwitchHLSStreamWriter
211211

src/streamlink/stream/hls_filtered.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import logging
2+
from threading import Event
3+
4+
from .hls import HLSStreamWriter, HLSStreamReader
5+
6+
7+
log = logging.getLogger(__name__)
8+
9+
10+
class FilteredHLSStreamWriter(HLSStreamWriter):
11+
def should_filter_sequence(self, sequence):
12+
return False
13+
14+
def write(self, sequence, *args, **kwargs):
15+
if not self.should_filter_sequence(sequence):
16+
try:
17+
return super(FilteredHLSStreamWriter, self).write(sequence, *args, **kwargs)
18+
finally:
19+
# unblock reader thread after writing data to the buffer
20+
if not self.reader.filter_event.is_set():
21+
log.info("Resuming stream output")
22+
self.reader.filter_event.set()
23+
24+
# block reader thread if filtering out segments
25+
elif self.reader.filter_event.is_set():
26+
log.info("Filtering out segments and pausing stream output")
27+
self.reader.filter_event.clear()
28+
29+
30+
class FilteredHLSStreamReader(HLSStreamReader):
31+
def __init__(self, *args, **kwargs):
32+
super(FilteredHLSStreamReader, self).__init__(*args, **kwargs)
33+
self.filter_event = Event()
34+
self.filter_event.set()
35+
36+
def read(self, size):
37+
while True:
38+
try:
39+
return super(FilteredHLSStreamReader, self).read(size)
40+
except IOError:
41+
# wait indefinitely until filtering ends
42+
self.filter_event.wait()
43+
if self.buffer.closed:
44+
return b""
45+
# if data is available, try reading again
46+
if self.buffer.length > 0:
47+
continue
48+
# raise if not filtering and no data available
49+
raise
50+
51+
def close(self):
52+
super(FilteredHLSStreamReader, self).close()
53+
self.filter_event.set()

tests/streams/test_hls_filtered.py

Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
import requests_mock
2+
from tests.mock import MagicMock, call, patch
3+
import unittest
4+
5+
import itertools
6+
from textwrap import dedent
7+
from threading import Event, Thread
8+
9+
from streamlink.session import Streamlink
10+
from streamlink.stream.hls import HLSStream
11+
from streamlink.stream.hls_filtered import FilteredHLSStreamWriter, FilteredHLSStreamReader
12+
13+
14+
class _TestSubjectFilteredHLSStreamWriter(FilteredHLSStreamWriter):
15+
def __init__(self, *args, **kwargs):
16+
super(_TestSubjectFilteredHLSStreamWriter, self).__init__(*args, **kwargs)
17+
self.write_wait = Event()
18+
self.write_done = Event()
19+
20+
def write(self, *args, **kwargs):
21+
# only write once per step
22+
self.write_wait.wait()
23+
self.write_wait.clear()
24+
25+
# don't write again during cleanup
26+
if self.closed:
27+
return
28+
29+
super(_TestSubjectFilteredHLSStreamWriter, self).write(*args, **kwargs)
30+
31+
# notify main thread that writing has finished
32+
self.write_done.set()
33+
34+
35+
class _TestSubjectFilteredHLSReader(FilteredHLSStreamReader):
36+
__writer__ = _TestSubjectFilteredHLSStreamWriter
37+
38+
39+
class _TestSubjectReadThread(Thread):
40+
"""
41+
Run the reader on a separate thread, so that each read can be controlled from within the main thread
42+
"""
43+
def __init__(self, segments, playlists):
44+
Thread.__init__(self)
45+
self.daemon = True
46+
47+
self.mocks = mocks = {}
48+
self.mock = requests_mock.Mocker()
49+
self.mock.start()
50+
51+
def addmock(method, url, *args, **kwargs):
52+
mocks[url] = method(url, *args, **kwargs)
53+
54+
addmock(self.mock.get, TestFilteredHLSStream.url_playlist, [{"text": p} for p in playlists])
55+
for i, segment in enumerate(segments):
56+
addmock(self.mock.get, TestFilteredHLSStream.url_segment.format(i), content=segment)
57+
58+
session = Streamlink()
59+
session.set_option("hls-live-edge", 2)
60+
session.set_option("hls-timeout", 0)
61+
session.set_option("stream-timeout", 0)
62+
63+
self.read_wait = Event()
64+
self.read_done = Event()
65+
self.data = []
66+
self.error = None
67+
68+
self.stream = HLSStream(session, TestFilteredHLSStream.url_playlist)
69+
self.reader = _TestSubjectFilteredHLSReader(self.stream)
70+
self.reader.open()
71+
72+
def run(self):
73+
while True:
74+
# only read once per step
75+
self.read_wait.wait()
76+
self.read_wait.clear()
77+
78+
# don't read again during cleanup
79+
if self.reader.closed:
80+
return
81+
82+
try:
83+
data = self.reader.read(-1)
84+
self.data.append(data)
85+
except IOError as err:
86+
self.error = err
87+
return
88+
finally:
89+
# notify main thread that reading has finished
90+
self.read_done.set()
91+
92+
def cleanup(self):
93+
self.reader.close()
94+
self.mock.stop()
95+
# make sure that write and read threads halts on cleanup
96+
self.reader.writer.write_wait.set()
97+
self.read_wait.set()
98+
99+
def await_write(self):
100+
writer = self.reader.writer
101+
if not writer.closed:
102+
# make one write call and wait until write call has finished
103+
writer.write_wait.set()
104+
writer.write_done.wait()
105+
writer.write_done.clear()
106+
107+
def await_read(self):
108+
if not self.reader.closed:
109+
# make one read call and wait until read call has finished
110+
self.read_wait.set()
111+
self.read_done.wait()
112+
self.read_done.clear()
113+
114+
115+
@patch("streamlink.stream.hls.HLSStreamWorker.wait", MagicMock(return_value=True))
116+
class TestFilteredHLSStream(unittest.TestCase):
117+
url_playlist = "http://mocked/path/playlist.m3u8"
118+
url_segment = "http://mocked/path/stream{0}.ts"
119+
120+
@classmethod
121+
def get_segments(cls, num):
122+
return ["[{0}]".format(i).encode("ascii") for i in range(num)]
123+
124+
@classmethod
125+
def get_playlist(cls, media_sequence, items, filtered=False, end=False):
126+
playlist = dedent("""
127+
#EXTM3U
128+
#EXT-X-VERSION:5
129+
#EXT-X-TARGETDURATION:1
130+
#EXT-X-MEDIA-SEQUENCE:{0}
131+
""".format(media_sequence))
132+
133+
for item in items:
134+
playlist += "#EXTINF:1.000,{1}\nstream{0}.ts\n".format(item, "filtered" if filtered else "live")
135+
136+
if end:
137+
playlist += "#EXT-X-ENDLIST\n"
138+
139+
return playlist
140+
141+
@classmethod
142+
def filter_sequence(cls, sequence):
143+
return sequence.segment.title == "filtered"
144+
145+
def subject(self, segments, playlists):
146+
thread = _TestSubjectReadThread(segments, playlists)
147+
self.addCleanup(thread.cleanup)
148+
thread.start()
149+
150+
return thread, thread.reader, thread.reader.writer
151+
152+
@patch("streamlink.stream.hls_filtered.FilteredHLSStreamWriter.should_filter_sequence", new=filter_sequence)
153+
@patch("streamlink.stream.hls_filtered.log")
154+
def test_filtered_logging(self, mock_log):
155+
segments = self.get_segments(8)
156+
thread, reader, writer = self.subject(segments, [
157+
self.get_playlist(0, [0, 1], filtered=True),
158+
self.get_playlist(2, [2, 3], filtered=False),
159+
self.get_playlist(4, [4, 5], filtered=True),
160+
self.get_playlist(6, [6, 7], filtered=False, end=True)
161+
])
162+
163+
self.assertTrue(reader.filter_event.is_set(), "Doesn't let the reader wait if not filtering")
164+
165+
for i in range(2):
166+
thread.await_write()
167+
thread.await_write()
168+
self.assertEqual(len(mock_log.info.mock_calls), i * 2 + 1)
169+
self.assertEqual(mock_log.info.mock_calls[i * 2 + 0], call("Filtering out segments and pausing stream output"))
170+
self.assertFalse(reader.filter_event.is_set(), "Lets the reader wait if filtering")
171+
172+
thread.await_write()
173+
thread.await_write()
174+
self.assertEqual(len(mock_log.info.mock_calls), i * 2 + 2)
175+
self.assertEqual(mock_log.info.mock_calls[i * 2 + 1], call("Resuming stream output"))
176+
self.assertTrue(reader.filter_event.is_set(), "Doesn't let the reader wait if not filtering")
177+
178+
thread.await_read()
179+
180+
self.assertEqual(
181+
b"".join(thread.data),
182+
b"".join(list(itertools.chain(segments[2:4], segments[6:8]))),
183+
"Correctly filters out segments"
184+
)
185+
for i, _ in enumerate(segments):
186+
self.assertTrue(thread.mocks[TestFilteredHLSStream.url_segment.format(i)].called, "Downloads all segments")
187+
188+
# don't patch should_filter_sequence here (it always returns False)
189+
def test_not_filtered(self):
190+
segments = self.get_segments(2)
191+
thread, reader, writer = self.subject(segments, [
192+
self.get_playlist(0, [0, 1], filtered=True, end=True)
193+
])
194+
195+
thread.await_write()
196+
thread.await_write()
197+
thread.await_read()
198+
self.assertEqual(b"".join(thread.data), b"".join(segments[0:2]), "Does not filter by default")
199+
200+
@patch("streamlink.stream.hls_filtered.FilteredHLSStreamWriter.should_filter_sequence", new=filter_sequence)
201+
def test_filtered_timeout(self):
202+
segments = self.get_segments(2)
203+
thread, reader, writer = self.subject(segments, [
204+
self.get_playlist(0, [0, 1], filtered=False, end=True)
205+
])
206+
207+
thread.await_write()
208+
thread.await_read()
209+
self.assertEqual(thread.data, segments[0:1], "Has read the first segment")
210+
211+
# simulate a timeout by having an empty buffer
212+
# timeout value is set to 0
213+
thread.await_read()
214+
self.assertIsInstance(thread.error, IOError, "Raises a timeout error when no data is available to read")
215+
216+
@patch("streamlink.stream.hls_filtered.FilteredHLSStreamWriter.should_filter_sequence", new=filter_sequence)
217+
def test_filtered_no_timeout(self):
218+
segments = self.get_segments(4)
219+
thread, reader, writer = self.subject(segments, [
220+
self.get_playlist(0, [0, 1], filtered=True),
221+
self.get_playlist(2, [2, 3], filtered=False, end=True)
222+
])
223+
224+
self.assertTrue(reader.filter_event.is_set(), "Doesn't let the reader wait if not filtering")
225+
226+
thread.await_write()
227+
thread.await_write()
228+
self.assertFalse(reader.filter_event.is_set(), "Lets the reader wait if filtering")
229+
230+
# make reader read (no data available yet)
231+
thread.read_wait.set()
232+
# once data becomes available, the reader continues reading
233+
thread.await_write()
234+
self.assertTrue(reader.filter_event.is_set(), "Reader is not waiting anymore")
235+
236+
thread.read_done.wait()
237+
thread.read_done.clear()
238+
self.assertFalse(thread.error, "Doesn't time out when filtering")
239+
self.assertEqual(thread.data, segments[2:3], "Reads next available buffer data")
240+
241+
thread.await_write()
242+
thread.await_read()
243+
self.assertEqual(thread.data, segments[2:4])
244+
245+
@patch("streamlink.stream.hls_filtered.FilteredHLSStreamWriter.should_filter_sequence", new=filter_sequence)
246+
def test_filtered_closed(self):
247+
segments = self.get_segments(2)
248+
thread, reader, writer = self.subject(segments, [
249+
self.get_playlist(0, [0, 1], filtered=True)
250+
])
251+
252+
self.assertTrue(reader.filter_event.is_set(), "Doesn't let the reader wait if not filtering")
253+
thread.await_write()
254+
self.assertFalse(reader.filter_event.is_set(), "Lets the reader wait if filtering")
255+
256+
# make reader read (no data available yet)
257+
thread.read_wait.set()
258+
259+
# close stream while reader is waiting for filtering to end
260+
thread.reader.close()
261+
thread.read_done.wait()
262+
thread.read_done.clear()
263+
self.assertEqual(thread.data, [b""], "Stops reading on stream close")

0 commit comments

Comments
 (0)