1
+ import logging
1
2
import unittest
3
+ from functools import partial
2
4
3
- from streamlink .plugins .twitch import Twitch
5
+ from streamlink .plugins .twitch import Twitch , TwitchHLSStream
6
+
7
+ import requests_mock
8
+
9
+ from streamlink .session import Streamlink
10
+ from tests .resources import text
11
+
12
+
13
+ log = logging .getLogger (__name__ )
4
14
5
15
6
16
class TestPluginTwitch (unittest .TestCase ):
@@ -21,3 +31,117 @@ def test_can_handle_url_negative(self):
21
31
]
22
32
for url in should_not_match :
23
33
self .assertFalse (Twitch .can_handle_url (url ))
34
+
35
+
36
+ class TestTwitchHLSStream (unittest .TestCase ):
37
+ scte35_out = "#EXT-X-DISCONTINUITY\n #EXT-X-SCTE35-OUT\n "
38
+ scte35_out_cont = "#EXT-X-SCTE35-OUT-CONT\n "
39
+ scte35_in = "#EXT-X-DISCONTINUITY\n #EXT-X-SCTE35-IN\n "
40
+ segment = "#EXTINF:1.000,\n stream{0}.ts\n "
41
+
42
+ def getMasterPlaylist (self ):
43
+ with text ("hls/test_master.m3u8" ) as pl :
44
+ return pl .read ()
45
+
46
+ def getPlaylist (self , media_sequence , items ):
47
+ playlist = """
48
+ #EXTM3U
49
+ #EXT-X-VERSION:5
50
+ #EXT-X-TARGETDURATION:1
51
+ #EXT-X-MEDIA-SEQUENCE:{0}
52
+ """ .format (media_sequence )
53
+
54
+ for item in items :
55
+ if type (item ) != int :
56
+ playlist += item
57
+ else :
58
+ playlist += self .segment .format (item )
59
+
60
+ return playlist
61
+
62
+ def start_streamlink (self , kwargs = None ):
63
+ kwargs = kwargs or {}
64
+ log .info ("Executing streamlink" )
65
+ streamlink = Streamlink ()
66
+
67
+ streamlink .set_option ("hls-live-edge" , 4 )
68
+ streamlink .set_option ("twitch-disable-ads" , True )
69
+
70
+ masterStream = TwitchHLSStream .parse_variant_playlist (
71
+ streamlink ,
72
+ "http://mocked/path/master.m3u8" ,
73
+ ** kwargs
74
+ )
75
+ stream = masterStream ["1080p (source)" ].open ()
76
+ data = b"" .join (iter (partial (stream .read , 8192 ), b"" ))
77
+ stream .close ()
78
+ log .info ("End of streamlink execution" )
79
+ return data
80
+
81
+ def get_result (self , streams , playlists ):
82
+ with requests_mock .Mocker () as mock :
83
+ mock .get ("http://mocked/path/master.m3u8" , text = self .getMasterPlaylist ())
84
+ mock .get ("http://mocked/path/playlist.m3u8" , [{"text" : playlist } for playlist in playlists ])
85
+ for i , stream in enumerate (streams ):
86
+ mock .get ("http://mocked/path/stream{0}.ts" .format (i ), content = stream )
87
+ return self .start_streamlink ()
88
+
89
+ def test_hls_scte35_start_with_end (self ):
90
+ streams = ["[{0}]" .format (i ).encode ("ascii" ) for i in range (12 )]
91
+ playlists = [
92
+ self .getPlaylist (0 , [self .scte35_out , 0 , 1 , 2 , 3 ]),
93
+ self .getPlaylist (4 , [self .scte35_in , 4 , 5 , 6 , 7 ]),
94
+ self .getPlaylist (8 , [8 , 9 , 10 , 11 ]) + "#EXT-X-ENDLIST\n "
95
+ ]
96
+ result = self .get_result (streams , playlists )
97
+
98
+ expected = b'' .join (streams [4 :12 ])
99
+ self .assertEqual (expected , result )
100
+
101
+ def test_hls_scte35_no_start (self ):
102
+ streams = ["[{0}]" .format (i ).encode ("ascii" ) for i in range (8 )]
103
+ playlists = [
104
+ self .getPlaylist (0 , [0 , 1 , 2 , 3 ]),
105
+ self .getPlaylist (4 , [self .scte35_in , 4 , 5 , 6 , 7 ]) + "#EXT-X-ENDLIST\n "
106
+ ]
107
+ result = self .get_result (streams , playlists )
108
+
109
+ expected = b'' .join (streams [0 :8 ])
110
+ self .assertEqual (expected , result )
111
+
112
+ def test_hls_scte35_no_start_with_cont (self ):
113
+ streams = ["[{0}]" .format (i ).encode ("ascii" ) for i in range (8 )]
114
+ playlists = [
115
+ self .getPlaylist (0 , [self .scte35_out_cont , 0 , 1 , 2 , 3 ]),
116
+ self .getPlaylist (4 , [self .scte35_in , 4 , 5 , 6 , 7 ]) + "#EXT-X-ENDLIST\n "
117
+ ]
118
+ result = self .get_result (streams , playlists )
119
+
120
+ expected = b'' .join (streams [4 :8 ])
121
+ self .assertEqual (expected , result )
122
+
123
+ def test_hls_scte35_no_end (self ):
124
+ streams = ["[{0}]" .format (i ).encode ("ascii" ) for i in range (12 )]
125
+ playlists = [
126
+ self .getPlaylist (0 , [0 , 1 , 2 , 3 ]),
127
+ self .getPlaylist (4 , [self .scte35_out , 4 , 5 , 6 , 7 ]),
128
+ self .getPlaylist (8 , [8 , 9 , 10 , 11 ]) + "#EXT-X-ENDLIST\n "
129
+ ]
130
+ result = self .get_result (streams , playlists )
131
+
132
+ expected = b'' .join (streams [0 :4 ])
133
+ self .assertEqual (expected , result )
134
+
135
+ def test_hls_scte35_in_between (self ):
136
+ streams = ["[{0}]" .format (i ).encode ("ascii" ) for i in range (20 )]
137
+ playlists = [
138
+ self .getPlaylist (0 , [0 , 1 , 2 , 3 ]),
139
+ self .getPlaylist (4 , [4 , 5 , self .scte35_out , 6 , 7 ]),
140
+ self .getPlaylist (8 , [8 , 9 , 10 , 11 ]),
141
+ self .getPlaylist (12 , [12 , 13 , self .scte35_in , 14 , 15 ]),
142
+ self .getPlaylist (16 , [16 , 17 , 18 , 19 ]) + "#EXT-X-ENDLIST\n "
143
+ ]
144
+ result = self .get_result (streams , playlists )
145
+
146
+ expected = b'' .join (streams [0 :6 ]) + b'' .join (streams [14 :20 ])
147
+ self .assertEqual (expected , result )
0 commit comments