-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Add FilteredHLSStream #3187
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
gravyboat
merged 2 commits into
streamlink:master
from
bastimeyer:plugins/twitch/filter-timeout
Sep 21, 2020
Merged
Add FilteredHLSStream #3187
Changes from 1 commit
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next
Next commit
stream.hls_filtered: implement FilteredHLSStream
Adds subclasses of the HLSStreamReader and HLSStreamWriter for filtering out sequences and ignoring the read timeout while filtering.
- Loading branch information
commit 6b22aca9113ea7bb08044dcc3b10812ffe1fa2cd
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
import logging | ||
from threading import Event | ||
|
||
from .hls import HLSStreamWriter, HLSStreamReader | ||
|
||
|
||
log = logging.getLogger(__name__) | ||
|
||
|
||
class FilteredHLSStreamWriter(HLSStreamWriter): | ||
def should_filter_sequence(self, sequence): | ||
return False | ||
|
||
def write(self, sequence, *args, **kwargs): | ||
if not self.should_filter_sequence(sequence): | ||
try: | ||
return super(FilteredHLSStreamWriter, self).write(sequence, *args, **kwargs) | ||
finally: | ||
# unblock reader thread after writing data to the buffer | ||
if not self.reader.filter_event.is_set(): | ||
log.info("Resuming stream output") | ||
self.reader.filter_event.set() | ||
|
||
# block reader thread if filtering out segments | ||
elif self.reader.filter_event.is_set(): | ||
log.info("Filtering out segments and pausing stream output") | ||
self.reader.filter_event.clear() | ||
|
||
|
||
class FilteredHLSStreamReader(HLSStreamReader): | ||
def __init__(self, *args, **kwargs): | ||
super(FilteredHLSStreamReader, self).__init__(*args, **kwargs) | ||
self.filter_event = Event() | ||
self.filter_event.set() | ||
|
||
def read(self, size): | ||
while True: | ||
try: | ||
return super(FilteredHLSStreamReader, self).read(size) | ||
except IOError: | ||
# wait indefinitely until filtering ends | ||
self.filter_event.wait() | ||
if self.buffer.closed: | ||
return b"" | ||
# if data is available, try reading again | ||
if self.buffer.length > 0: | ||
continue | ||
# raise if not filtering and no data available | ||
raise | ||
|
||
def close(self): | ||
super(FilteredHLSStreamReader, self).close() | ||
self.filter_event.set() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,263 @@ | ||
import requests_mock | ||
from tests.mock import MagicMock, call, patch | ||
import unittest | ||
|
||
import itertools | ||
from textwrap import dedent | ||
from threading import Event, Thread | ||
|
||
from streamlink.session import Streamlink | ||
from streamlink.stream.hls import HLSStream | ||
from streamlink.stream.hls_filtered import FilteredHLSStreamWriter, FilteredHLSStreamReader | ||
|
||
|
||
class _TestSubjectFilteredHLSStreamWriter(FilteredHLSStreamWriter): | ||
def __init__(self, *args, **kwargs): | ||
super(_TestSubjectFilteredHLSStreamWriter, self).__init__(*args, **kwargs) | ||
self.write_wait = Event() | ||
self.write_done = Event() | ||
|
||
def write(self, *args, **kwargs): | ||
# only write once per step | ||
self.write_wait.wait() | ||
self.write_wait.clear() | ||
|
||
# don't write again during cleanup | ||
if self.closed: | ||
return | ||
|
||
super(_TestSubjectFilteredHLSStreamWriter, self).write(*args, **kwargs) | ||
|
||
# notify main thread that writing has finished | ||
self.write_done.set() | ||
|
||
|
||
class _TestSubjectFilteredHLSReader(FilteredHLSStreamReader): | ||
__writer__ = _TestSubjectFilteredHLSStreamWriter | ||
|
||
|
||
class _TestSubjectReadThread(Thread): | ||
""" | ||
Run the reader on a separate thread, so that each read can be controlled from within the main thread | ||
""" | ||
def __init__(self, segments, playlists): | ||
Thread.__init__(self) | ||
self.daemon = True | ||
|
||
self.mocks = mocks = {} | ||
self.mock = requests_mock.Mocker() | ||
self.mock.start() | ||
|
||
def addmock(method, url, *args, **kwargs): | ||
mocks[url] = method(url, *args, **kwargs) | ||
|
||
addmock(self.mock.get, TestFilteredHLSStream.url_playlist, [{"text": p} for p in playlists]) | ||
for i, segment in enumerate(segments): | ||
addmock(self.mock.get, TestFilteredHLSStream.url_segment.format(i), content=segment) | ||
|
||
session = Streamlink() | ||
session.set_option("hls-live-edge", 2) | ||
session.set_option("hls-timeout", 0) | ||
session.set_option("stream-timeout", 0) | ||
|
||
self.read_wait = Event() | ||
self.read_done = Event() | ||
self.data = [] | ||
self.error = None | ||
|
||
self.stream = HLSStream(session, TestFilteredHLSStream.url_playlist) | ||
self.reader = _TestSubjectFilteredHLSReader(self.stream) | ||
self.reader.open() | ||
|
||
def run(self): | ||
while True: | ||
# only read once per step | ||
self.read_wait.wait() | ||
self.read_wait.clear() | ||
|
||
# don't read again during cleanup | ||
if self.reader.closed: | ||
return | ||
|
||
try: | ||
data = self.reader.read(-1) | ||
self.data.append(data) | ||
except IOError as err: | ||
self.error = err | ||
return | ||
finally: | ||
# notify main thread that reading has finished | ||
self.read_done.set() | ||
|
||
def cleanup(self): | ||
self.reader.close() | ||
self.mock.stop() | ||
# make sure that write and read threads halts on cleanup | ||
self.reader.writer.write_wait.set() | ||
self.read_wait.set() | ||
|
||
def await_write(self): | ||
writer = self.reader.writer | ||
if not writer.closed: | ||
# make one write call and wait until write call has finished | ||
writer.write_wait.set() | ||
writer.write_done.wait() | ||
writer.write_done.clear() | ||
|
||
def await_read(self): | ||
if not self.reader.closed: | ||
# make one read call and wait until read call has finished | ||
self.read_wait.set() | ||
self.read_done.wait() | ||
self.read_done.clear() | ||
|
||
|
||
@patch("streamlink.stream.hls.HLSStreamWorker.wait", MagicMock(return_value=True)) | ||
class TestFilteredHLSStream(unittest.TestCase): | ||
url_playlist = "http://mocked/path/playlist.m3u8" | ||
url_segment = "http://mocked/path/stream{0}.ts" | ||
|
||
@classmethod | ||
def get_segments(cls, num): | ||
return ["[{0}]".format(i).encode("ascii") for i in range(num)] | ||
|
||
@classmethod | ||
def get_playlist(cls, media_sequence, items, filtered=False, end=False): | ||
playlist = dedent(""" | ||
#EXTM3U | ||
#EXT-X-VERSION:5 | ||
#EXT-X-TARGETDURATION:1 | ||
#EXT-X-MEDIA-SEQUENCE:{0} | ||
""".format(media_sequence)) | ||
|
||
for item in items: | ||
playlist += "#EXTINF:1.000,{1}\nstream{0}.ts\n".format(item, "filtered" if filtered else "live") | ||
|
||
if end: | ||
playlist += "#EXT-X-ENDLIST\n" | ||
|
||
return playlist | ||
|
||
@classmethod | ||
def filter_sequence(cls, sequence): | ||
return sequence.segment.title == "filtered" | ||
|
||
def subject(self, segments, playlists): | ||
thread = _TestSubjectReadThread(segments, playlists) | ||
self.addCleanup(thread.cleanup) | ||
thread.start() | ||
|
||
return thread, thread.reader, thread.reader.writer | ||
|
||
@patch("streamlink.stream.hls_filtered.FilteredHLSStreamWriter.should_filter_sequence", new=filter_sequence) | ||
@patch("streamlink.stream.hls_filtered.log") | ||
def test_filtered_logging(self, mock_log): | ||
segments = self.get_segments(8) | ||
thread, reader, writer = self.subject(segments, [ | ||
self.get_playlist(0, [0, 1], filtered=True), | ||
self.get_playlist(2, [2, 3], filtered=False), | ||
self.get_playlist(4, [4, 5], filtered=True), | ||
self.get_playlist(6, [6, 7], filtered=False, end=True) | ||
]) | ||
|
||
self.assertTrue(reader.filter_event.is_set(), "Doesn't let the reader wait if not filtering") | ||
|
||
for i in range(2): | ||
thread.await_write() | ||
thread.await_write() | ||
self.assertEqual(len(mock_log.info.mock_calls), i * 2 + 1) | ||
self.assertEqual(mock_log.info.mock_calls[i * 2 + 0], call("Filtering out segments and pausing stream output")) | ||
self.assertFalse(reader.filter_event.is_set(), "Lets the reader wait if filtering") | ||
|
||
thread.await_write() | ||
thread.await_write() | ||
self.assertEqual(len(mock_log.info.mock_calls), i * 2 + 2) | ||
self.assertEqual(mock_log.info.mock_calls[i * 2 + 1], call("Resuming stream output")) | ||
self.assertTrue(reader.filter_event.is_set(), "Doesn't let the reader wait if not filtering") | ||
|
||
thread.await_read() | ||
|
||
self.assertEqual( | ||
b"".join(thread.data), | ||
b"".join(list(itertools.chain(segments[2:4], segments[6:8]))), | ||
"Correctly filters out segments" | ||
) | ||
for i, _ in enumerate(segments): | ||
self.assertTrue(thread.mocks[TestFilteredHLSStream.url_segment.format(i)].called, "Downloads all segments") | ||
|
||
# don't patch should_filter_sequence here (it always returns False) | ||
def test_not_filtered(self): | ||
segments = self.get_segments(2) | ||
thread, reader, writer = self.subject(segments, [ | ||
self.get_playlist(0, [0, 1], filtered=True, end=True) | ||
]) | ||
|
||
thread.await_write() | ||
thread.await_write() | ||
thread.await_read() | ||
self.assertEqual(b"".join(thread.data), b"".join(segments[0:2]), "Does not filter by default") | ||
|
||
@patch("streamlink.stream.hls_filtered.FilteredHLSStreamWriter.should_filter_sequence", new=filter_sequence) | ||
def test_filtered_timeout(self): | ||
segments = self.get_segments(2) | ||
thread, reader, writer = self.subject(segments, [ | ||
self.get_playlist(0, [0, 1], filtered=False, end=True) | ||
]) | ||
|
||
thread.await_write() | ||
thread.await_read() | ||
self.assertEqual(thread.data, segments[0:1], "Has read the first segment") | ||
|
||
# simulate a timeout by having an empty buffer | ||
# timeout value is set to 0 | ||
thread.await_read() | ||
self.assertIsInstance(thread.error, IOError, "Raises a timeout error when no data is available to read") | ||
|
||
@patch("streamlink.stream.hls_filtered.FilteredHLSStreamWriter.should_filter_sequence", new=filter_sequence) | ||
def test_filtered_no_timeout(self): | ||
segments = self.get_segments(4) | ||
thread, reader, writer = self.subject(segments, [ | ||
self.get_playlist(0, [0, 1], filtered=True), | ||
self.get_playlist(2, [2, 3], filtered=False, end=True) | ||
]) | ||
|
||
self.assertTrue(reader.filter_event.is_set(), "Doesn't let the reader wait if not filtering") | ||
|
||
thread.await_write() | ||
thread.await_write() | ||
self.assertFalse(reader.filter_event.is_set(), "Lets the reader wait if filtering") | ||
|
||
# make reader read (no data available yet) | ||
thread.read_wait.set() | ||
# once data becomes available, the reader continues reading | ||
thread.await_write() | ||
self.assertTrue(reader.filter_event.is_set(), "Reader is not waiting anymore") | ||
|
||
thread.read_done.wait() | ||
thread.read_done.clear() | ||
self.assertFalse(thread.error, "Doesn't time out when filtering") | ||
self.assertEqual(thread.data, segments[2:3], "Reads next available buffer data") | ||
|
||
thread.await_write() | ||
thread.await_read() | ||
self.assertEqual(thread.data, segments[2:4]) | ||
|
||
@patch("streamlink.stream.hls_filtered.FilteredHLSStreamWriter.should_filter_sequence", new=filter_sequence) | ||
def test_filtered_closed(self): | ||
segments = self.get_segments(2) | ||
thread, reader, writer = self.subject(segments, [ | ||
self.get_playlist(0, [0, 1], filtered=True) | ||
]) | ||
|
||
self.assertTrue(reader.filter_event.is_set(), "Doesn't let the reader wait if not filtering") | ||
thread.await_write() | ||
self.assertFalse(reader.filter_event.is_set(), "Lets the reader wait if filtering") | ||
|
||
# make reader read (no data available yet) | ||
thread.read_wait.set() | ||
|
||
# close stream while reader is waiting for filtering to end | ||
thread.reader.close() | ||
thread.read_done.wait() | ||
thread.read_done.clear() | ||
self.assertEqual(thread.data, [b""], "Stops reading on stream close") |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commenting here, so that it's clear where the issue is.
Adding sleeps is a last resort solution, and I don't really want this there.
I don't understand the issue if I'm being honest. The writer and main thread should both be synced properly after the await_write() call by the main thread, and the writer thread should have cleared the reader event, meaning that filter_event.is_set() should be false at this point.
The only thing that could go wrong here is the patched should_filter_sequence method, which could have returned a False value and altered the intended code path. And it's confusing to me that this has only happened in py2 so far.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test failure has started to happen after my last change with the patch decorator (if we've not been unlucky with successful runs prior to that). Is it possible that it gets replaced lazily and too late in some cases?
Why has the
test_filtered_closed
test not failed yet? It's basically the same setup at the beginning of both tests.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see how we could have been that unlucky with previous successful runs. Lazy replacement is possible. I just ran the tests in my local (Windows) environment and the new twitch tests passed perfectly fine. It seems pretty suspicious as well that this issue is limited to the CI.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've run this specific test on my test-account's fork 100 times and it didn't fail:
https://github.com/bastimeyer-test/streamlink-test/actions/runs/266135453
Running all tests 100 times however did fail eventually (with failfast enabled to cancel the other runners):
https://github.com/bastimeyer-test/streamlink-test/actions/runs/266138718
I will revert the last change now and then run it again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like there's something else wrong with the await_write() method or the general thread syncing method that's being used, because it now failed at a different place:
https://github.com/bastimeyer-test/streamlink-test/runs/1147228622?check_suite_focus=true#step:5:1246
that's annoying
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did some more debugging by adding detailed log messages and found the source of the issue (but not the cause yet).
In the
test_filtered_no_timeout
test for example, the two segments of the first playlist are supposed to be filtered out and the two segments of the second playlist are supposed to be written. This means that when trying to write the first two segments, it should ignore them and lock the reader thread. Sometimes this isn't done though, because the writer already starts with the third sequence, the first segment of the second playlist. Since this segment should not get filtered out, the lock in the reader thread is of course not set and the test fails.So the question is, why does the writer start writing the third segment sometimes? The worker, which is putting / queuing up segments into the writer is for some reason starting with the second playlist.
According to the log output, this isn't the fault of the worker, so it must be
requests_mock
's fault:https://github.com/bastimeyer-test/streamlink-test/runs/1147915591?check_suite_focus=true#step:5:639
I've already tried moving the requests_mock setup from the reader thread where it currently is to the main thread and into the unittest.TestCase class's
setUp
andtearDown
methods, but it didn't help. The playlist request responses are set up correctly:https://requests-mock.readthedocs.io/en/latest/response.html#response-lists
Is it possible that it's interference with the other tests, most likely the Twitch plugin's tests, where the requests_mock doesn't get reset properly? That would explain the network error messages I was talking about earlier. Remember that running only the hls_filtered tests 100 times in a row by the CI runners worked fine (link above).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have fixed some issues with stalled threads on teardown in the hls_filtered tests by rewriting them and joining all spawned threads in the main thread's tearDown method and waiting for their termination, but the issue still occurs, now much rarer though, which is interesting.
When printing the name of all running threads after a clean teardown, it looks like some
ThreadPoolExecuter
segment-fetch-threads of one of the previously run stream workers are still running, even though all other tests are calling close() on their spawned streams every time. Those left over fetch-threads are bleeding into the current test run and are making the requests on the mocked URLs, which are the same between tests:https://github.com/bastimeyer-test/streamlink-test/runs/1149311834?check_suite_focus=true#step:5:1418
If I change the mocked URLs in just the hls_filtered tests, then there are no failures 🎉🎉🎉, which means these bleeding threads are coming from other tests runs. Those are probably test_twitch.py or test_hls.py, because those are the only ones which are opening HLSStreams and reading them, the latter being more likely since it's run right before test_hls_filtered.py.
https://github.com/bastimeyer-test/streamlink-test/actions/runs/266894833 (no failed tests after 30 mins of execution - canceled the workflow after that, because I didn't want to wait any longer)
I will open a PR with fixes later after cleaning up all my debugging. This will be a fix and a workaround at the same time, because I won't touch the other "faulty" tests with their bleeding threads now. This can be done later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix/workaround (commit ID will be different than my PR):
https://github.com/bastimeyer-test/streamlink-test/commit/8be37ef6fb145ccb83ad11b51fa0e1453b5e0cdd
All tests running 50 times in a row on all CI runners (currently still running since 20 mins while I'm writing this):
https://github.com/bastimeyer-test/streamlink-test/actions/runs/266975807