Skip to content

Add FilteredHLSStream #3187

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 21, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
stream.hls_filtered: implement FilteredHLSStream
Adds subclasses of the HLSStreamReader and HLSStreamWriter for filtering
out sequences and ignoring the read timeout while filtering.
  • Loading branch information
bastimeyer committed Sep 21, 2020
commit 6b22aca9113ea7bb08044dcc3b10812ffe1fa2cd
53 changes: 53 additions & 0 deletions src/streamlink/stream/hls_filtered.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import logging
from threading import Event

from .hls import HLSStreamWriter, HLSStreamReader


log = logging.getLogger(__name__)


class FilteredHLSStreamWriter(HLSStreamWriter):
def should_filter_sequence(self, sequence):
return False

def write(self, sequence, *args, **kwargs):
if not self.should_filter_sequence(sequence):
try:
return super(FilteredHLSStreamWriter, self).write(sequence, *args, **kwargs)
finally:
# unblock reader thread after writing data to the buffer
if not self.reader.filter_event.is_set():
log.info("Resuming stream output")
self.reader.filter_event.set()

# block reader thread if filtering out segments
elif self.reader.filter_event.is_set():
log.info("Filtering out segments and pausing stream output")
self.reader.filter_event.clear()


class FilteredHLSStreamReader(HLSStreamReader):
def __init__(self, *args, **kwargs):
super(FilteredHLSStreamReader, self).__init__(*args, **kwargs)
self.filter_event = Event()
self.filter_event.set()

def read(self, size):
while True:
try:
return super(FilteredHLSStreamReader, self).read(size)
except IOError:
# wait indefinitely until filtering ends
self.filter_event.wait()
if self.buffer.closed:
return b""
# if data is available, try reading again
if self.buffer.length > 0:
continue
# raise if not filtering and no data available
raise

def close(self):
super(FilteredHLSStreamReader, self).close()
self.filter_event.set()
263 changes: 263 additions & 0 deletions tests/streams/test_hls_filtered.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
import requests_mock
from tests.mock import MagicMock, call, patch
import unittest

import itertools
from textwrap import dedent
from threading import Event, Thread

from streamlink.session import Streamlink
from streamlink.stream.hls import HLSStream
from streamlink.stream.hls_filtered import FilteredHLSStreamWriter, FilteredHLSStreamReader


class _TestSubjectFilteredHLSStreamWriter(FilteredHLSStreamWriter):
def __init__(self, *args, **kwargs):
super(_TestSubjectFilteredHLSStreamWriter, self).__init__(*args, **kwargs)
self.write_wait = Event()
self.write_done = Event()

def write(self, *args, **kwargs):
# only write once per step
self.write_wait.wait()
self.write_wait.clear()

# don't write again during cleanup
if self.closed:
return

super(_TestSubjectFilteredHLSStreamWriter, self).write(*args, **kwargs)

# notify main thread that writing has finished
self.write_done.set()


class _TestSubjectFilteredHLSReader(FilteredHLSStreamReader):
__writer__ = _TestSubjectFilteredHLSStreamWriter


class _TestSubjectReadThread(Thread):
"""
Run the reader on a separate thread, so that each read can be controlled from within the main thread
"""
def __init__(self, segments, playlists):
Thread.__init__(self)
self.daemon = True

self.mocks = mocks = {}
self.mock = requests_mock.Mocker()
self.mock.start()

def addmock(method, url, *args, **kwargs):
mocks[url] = method(url, *args, **kwargs)

addmock(self.mock.get, TestFilteredHLSStream.url_playlist, [{"text": p} for p in playlists])
for i, segment in enumerate(segments):
addmock(self.mock.get, TestFilteredHLSStream.url_segment.format(i), content=segment)

session = Streamlink()
session.set_option("hls-live-edge", 2)
session.set_option("hls-timeout", 0)
session.set_option("stream-timeout", 0)

self.read_wait = Event()
self.read_done = Event()
self.data = []
self.error = None

self.stream = HLSStream(session, TestFilteredHLSStream.url_playlist)
self.reader = _TestSubjectFilteredHLSReader(self.stream)
self.reader.open()

def run(self):
while True:
# only read once per step
self.read_wait.wait()
self.read_wait.clear()

# don't read again during cleanup
if self.reader.closed:
return

try:
data = self.reader.read(-1)
self.data.append(data)
except IOError as err:
self.error = err
return
finally:
# notify main thread that reading has finished
self.read_done.set()

def cleanup(self):
self.reader.close()
self.mock.stop()
# make sure that write and read threads halts on cleanup
self.reader.writer.write_wait.set()
self.read_wait.set()

def await_write(self):
writer = self.reader.writer
if not writer.closed:
# make one write call and wait until write call has finished
writer.write_wait.set()
writer.write_done.wait()
writer.write_done.clear()

def await_read(self):
if not self.reader.closed:
# make one read call and wait until read call has finished
self.read_wait.set()
self.read_done.wait()
self.read_done.clear()


@patch("streamlink.stream.hls.HLSStreamWorker.wait", MagicMock(return_value=True))
class TestFilteredHLSStream(unittest.TestCase):
url_playlist = "http://mocked/path/playlist.m3u8"
url_segment = "http://mocked/path/stream{0}.ts"

@classmethod
def get_segments(cls, num):
return ["[{0}]".format(i).encode("ascii") for i in range(num)]

@classmethod
def get_playlist(cls, media_sequence, items, filtered=False, end=False):
playlist = dedent("""
#EXTM3U
#EXT-X-VERSION:5
#EXT-X-TARGETDURATION:1
#EXT-X-MEDIA-SEQUENCE:{0}
""".format(media_sequence))

for item in items:
playlist += "#EXTINF:1.000,{1}\nstream{0}.ts\n".format(item, "filtered" if filtered else "live")

if end:
playlist += "#EXT-X-ENDLIST\n"

return playlist

@classmethod
def filter_sequence(cls, sequence):
return sequence.segment.title == "filtered"

def subject(self, segments, playlists):
thread = _TestSubjectReadThread(segments, playlists)
self.addCleanup(thread.cleanup)
thread.start()

return thread, thread.reader, thread.reader.writer

@patch("streamlink.stream.hls_filtered.FilteredHLSStreamWriter.should_filter_sequence", new=filter_sequence)
@patch("streamlink.stream.hls_filtered.log")
def test_filtered_logging(self, mock_log):
segments = self.get_segments(8)
thread, reader, writer = self.subject(segments, [
self.get_playlist(0, [0, 1], filtered=True),
self.get_playlist(2, [2, 3], filtered=False),
self.get_playlist(4, [4, 5], filtered=True),
self.get_playlist(6, [6, 7], filtered=False, end=True)
])

self.assertTrue(reader.filter_event.is_set(), "Doesn't let the reader wait if not filtering")

for i in range(2):
thread.await_write()
thread.await_write()
self.assertEqual(len(mock_log.info.mock_calls), i * 2 + 1)
self.assertEqual(mock_log.info.mock_calls[i * 2 + 0], call("Filtering out segments and pausing stream output"))
self.assertFalse(reader.filter_event.is_set(), "Lets the reader wait if filtering")

thread.await_write()
thread.await_write()
self.assertEqual(len(mock_log.info.mock_calls), i * 2 + 2)
self.assertEqual(mock_log.info.mock_calls[i * 2 + 1], call("Resuming stream output"))
self.assertTrue(reader.filter_event.is_set(), "Doesn't let the reader wait if not filtering")

thread.await_read()

self.assertEqual(
b"".join(thread.data),
b"".join(list(itertools.chain(segments[2:4], segments[6:8]))),
"Correctly filters out segments"
)
for i, _ in enumerate(segments):
self.assertTrue(thread.mocks[TestFilteredHLSStream.url_segment.format(i)].called, "Downloads all segments")

# don't patch should_filter_sequence here (it always returns False)
def test_not_filtered(self):
segments = self.get_segments(2)
thread, reader, writer = self.subject(segments, [
self.get_playlist(0, [0, 1], filtered=True, end=True)
])

thread.await_write()
thread.await_write()
thread.await_read()
self.assertEqual(b"".join(thread.data), b"".join(segments[0:2]), "Does not filter by default")

@patch("streamlink.stream.hls_filtered.FilteredHLSStreamWriter.should_filter_sequence", new=filter_sequence)
def test_filtered_timeout(self):
segments = self.get_segments(2)
thread, reader, writer = self.subject(segments, [
self.get_playlist(0, [0, 1], filtered=False, end=True)
])

thread.await_write()
thread.await_read()
self.assertEqual(thread.data, segments[0:1], "Has read the first segment")

# simulate a timeout by having an empty buffer
# timeout value is set to 0
thread.await_read()
self.assertIsInstance(thread.error, IOError, "Raises a timeout error when no data is available to read")

@patch("streamlink.stream.hls_filtered.FilteredHLSStreamWriter.should_filter_sequence", new=filter_sequence)
def test_filtered_no_timeout(self):
segments = self.get_segments(4)
thread, reader, writer = self.subject(segments, [
self.get_playlist(0, [0, 1], filtered=True),
self.get_playlist(2, [2, 3], filtered=False, end=True)
])

self.assertTrue(reader.filter_event.is_set(), "Doesn't let the reader wait if not filtering")

thread.await_write()
thread.await_write()
self.assertFalse(reader.filter_event.is_set(), "Lets the reader wait if filtering")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commenting here, so that it's clear where the issue is.

Adding sleeps is a last resort solution, and I don't really want this there.

I don't understand the issue if I'm being honest. The writer and main thread should both be synced properly after the await_write() call by the main thread, and the writer thread should have cleared the reader event, meaning that filter_event.is_set() should be false at this point.

The only thing that could go wrong here is the patched should_filter_sequence method, which could have returned a False value and altered the intended code path. And it's confusing to me that this has only happened in py2 so far.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test failure has started to happen after my last change with the patch decorator (if we've not been unlucky with successful runs prior to that). Is it possible that it gets replaced lazily and too late in some cases?

Why has the test_filtered_closed test not failed yet? It's basically the same setup at the beginning of both tests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how we could have been that unlucky with previous successful runs. Lazy replacement is possible. I just ran the tests in my local (Windows) environment and the new twitch tests passed perfectly fine. It seems pretty suspicious as well that this issue is limited to the CI.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've run this specific test on my test-account's fork 100 times and it didn't fail:
https://github.com/bastimeyer-test/streamlink-test/actions/runs/266135453

Running all tests 100 times however did fail eventually (with failfast enabled to cancel the other runners):
https://github.com/bastimeyer-test/streamlink-test/actions/runs/266138718

I will revert the last change now and then run it again.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there's something else wrong with the await_write() method or the general thread syncing method that's being used, because it now failed at a different place:
https://github.com/bastimeyer-test/streamlink-test/runs/1147228622?check_suite_focus=true#step:5:1246
that's annoying

Copy link
Member Author

@bastimeyer bastimeyer Sep 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did some more debugging by adding detailed log messages and found the source of the issue (but not the cause yet).

In the test_filtered_no_timeout test for example, the two segments of the first playlist are supposed to be filtered out and the two segments of the second playlist are supposed to be written. This means that when trying to write the first two segments, it should ignore them and lock the reader thread. Sometimes this isn't done though, because the writer already starts with the third sequence, the first segment of the second playlist. Since this segment should not get filtered out, the lock in the reader thread is of course not set and the test fails.

So the question is, why does the writer start writing the third segment sometimes? The worker, which is putting / queuing up segments into the writer is for some reason starting with the second playlist.

According to the log output, this isn't the fault of the worker, so it must be requests_mock's fault:
https://github.com/bastimeyer-test/streamlink-test/runs/1147915591?check_suite_focus=true#step:5:639

I've already tried moving the requests_mock setup from the reader thread where it currently is to the main thread and into the unittest.TestCase class's setUp and tearDown methods, but it didn't help. The playlist request responses are set up correctly:
https://requests-mock.readthedocs.io/en/latest/response.html#response-lists

Is it possible that it's interference with the other tests, most likely the Twitch plugin's tests, where the requests_mock doesn't get reset properly? That would explain the network error messages I was talking about earlier. Remember that running only the hls_filtered tests 100 times in a row by the CI runners worked fine (link above).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have fixed some issues with stalled threads on teardown in the hls_filtered tests by rewriting them and joining all spawned threads in the main thread's tearDown method and waiting for their termination, but the issue still occurs, now much rarer though, which is interesting.

When printing the name of all running threads after a clean teardown, it looks like some ThreadPoolExecuter segment-fetch-threads of one of the previously run stream workers are still running, even though all other tests are calling close() on their spawned streams every time. Those left over fetch-threads are bleeding into the current test run and are making the requests on the mocked URLs, which are the same between tests:
https://github.com/bastimeyer-test/streamlink-test/runs/1149311834?check_suite_focus=true#step:5:1418

If I change the mocked URLs in just the hls_filtered tests, then there are no failures 🎉🎉🎉, which means these bleeding threads are coming from other tests runs. Those are probably test_twitch.py or test_hls.py, because those are the only ones which are opening HLSStreams and reading them, the latter being more likely since it's run right before test_hls_filtered.py.
https://github.com/bastimeyer-test/streamlink-test/actions/runs/266894833 (no failed tests after 30 mins of execution - canceled the workflow after that, because I didn't want to wait any longer)

I will open a PR with fixes later after cleaning up all my debugging. This will be a fix and a workaround at the same time, because I won't touch the other "faulty" tests with their bleeding threads now. This can be done later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix/workaround (commit ID will be different than my PR):
https://github.com/bastimeyer-test/streamlink-test/commit/8be37ef6fb145ccb83ad11b51fa0e1453b5e0cdd
All tests running 50 times in a row on all CI runners (currently still running since 20 mins while I'm writing this):
https://github.com/bastimeyer-test/streamlink-test/actions/runs/266975807


# make reader read (no data available yet)
thread.read_wait.set()
# once data becomes available, the reader continues reading
thread.await_write()
self.assertTrue(reader.filter_event.is_set(), "Reader is not waiting anymore")

thread.read_done.wait()
thread.read_done.clear()
self.assertFalse(thread.error, "Doesn't time out when filtering")
self.assertEqual(thread.data, segments[2:3], "Reads next available buffer data")

thread.await_write()
thread.await_read()
self.assertEqual(thread.data, segments[2:4])

@patch("streamlink.stream.hls_filtered.FilteredHLSStreamWriter.should_filter_sequence", new=filter_sequence)
def test_filtered_closed(self):
segments = self.get_segments(2)
thread, reader, writer = self.subject(segments, [
self.get_playlist(0, [0, 1], filtered=True)
])

self.assertTrue(reader.filter_event.is_set(), "Doesn't let the reader wait if not filtering")
thread.await_write()
self.assertFalse(reader.filter_event.is_set(), "Lets the reader wait if filtering")

# make reader read (no data available yet)
thread.read_wait.set()

# close stream while reader is waiting for filtering to end
thread.reader.close()
thread.read_done.wait()
thread.read_done.clear()
self.assertEqual(thread.data, [b""], "Stops reading on stream close")