1
+ import logging
1
2
import unittest
3
+ from functools import partial
2
4
3
- from streamlink .plugins .twitch import Twitch
5
+ from streamlink .plugins .twitch import Twitch , TwitchHLSStream
6
+
7
+ import requests_mock
8
+
9
+ from streamlink .session import Streamlink
10
+ from tests .resources import text
11
+
12
+
13
+ log = logging .getLogger (__name__ )
4
14
5
15
6
16
class TestPluginTwitch (unittest .TestCase ):
@@ -21,3 +31,118 @@ def test_can_handle_url_negative(self):
21
31
]
22
32
for url in should_not_match :
23
33
self .assertFalse (Twitch .can_handle_url (url ))
34
+
35
+
36
+ class TestTwitchHLSStream (unittest .TestCase ):
37
+ scte35_out = "#EXT-X-DISCONTINUITY\n #EXT-X-SCTE35-OUT\n "
38
+ scte35_out_cont = "#EXT-X-SCTE35-OUT-CONT\n "
39
+ scte35_in = "#EXT-X-DISCONTINUITY\n #EXT-X-SCTE35-IN\n "
40
+ segment = "#EXTINF:1.000,\n stream{0}.ts\n "
41
+
42
+ def getMasterPlaylist (self ):
43
+ with text ("hls/test_master.m3u8" ) as pl :
44
+ return pl .read ()
45
+
46
+ def getPlaylist (self , media_sequence , items ):
47
+ playlist = """
48
+ #EXTM3U
49
+ #EXT-X-VERSION:5
50
+ #EXT-X-TARGETDURATION:1
51
+ #EXT-X-MEDIA-SEQUENCE:{0}
52
+ """ .format (media_sequence )
53
+
54
+ for item in items :
55
+ if type (item ) != int :
56
+ playlist += item
57
+ else :
58
+ playlist += self .segment .format (item )
59
+
60
+ return playlist
61
+
62
+ def start_streamlink (self , kwargs = None ):
63
+ kwargs = kwargs or {}
64
+ log .info ("Executing streamlink" )
65
+ streamlink = Streamlink ()
66
+
67
+ streamlink .set_option ("hls-live-edge" , 4 )
68
+ # streamlink.set_option("twitch-disable-ads", True)
69
+
70
+ masterStream = TwitchHLSStream .parse_variant_playlist (
71
+ streamlink ,
72
+ "http://mocked/path/master.m3u8" ,
73
+ options = {"disable-ads" : True },
74
+ ** kwargs
75
+ )
76
+ stream = masterStream ["1080p (source)" ].open ()
77
+ data = b"" .join (iter (partial (stream .read , 8192 ), b"" ))
78
+ stream .close ()
79
+ log .info ("End of streamlink execution" )
80
+ return data
81
+
82
+ def get_result (self , streams , playlists ):
83
+ with requests_mock .Mocker () as mock :
84
+ mock .get ("http://mocked/path/master.m3u8" , text = self .getMasterPlaylist ())
85
+ mock .get ("http://mocked/path/playlist.m3u8" , [{"text" : playlist } for playlist in playlists ])
86
+ for i , stream in enumerate (streams ):
87
+ mock .get ("http://mocked/path/stream{0}.ts" .format (i ), content = stream )
88
+ return self .start_streamlink ()
89
+
90
+ def test_hls_scte35_start_with_end (self ):
91
+ streams = ["[{0}]" .format (i ).encode ("ascii" ) for i in range (12 )]
92
+ playlists = [
93
+ self .getPlaylist (0 , [self .scte35_out , 0 , 1 , 2 , 3 ]),
94
+ self .getPlaylist (4 , [self .scte35_in , 4 , 5 , 6 , 7 ]),
95
+ self .getPlaylist (8 , [8 , 9 , 10 , 11 ]) + "#EXT-X-ENDLIST\n "
96
+ ]
97
+ result = self .get_result (streams , playlists )
98
+
99
+ expected = b'' .join (streams [4 :12 ])
100
+ self .assertEqual (expected , result )
101
+
102
+ def test_hls_scte35_no_start (self ):
103
+ streams = ["[{0}]" .format (i ).encode ("ascii" ) for i in range (8 )]
104
+ playlists = [
105
+ self .getPlaylist (0 , [0 , 1 , 2 , 3 ]),
106
+ self .getPlaylist (4 , [self .scte35_in , 4 , 5 , 6 , 7 ]) + "#EXT-X-ENDLIST\n "
107
+ ]
108
+ result = self .get_result (streams , playlists )
109
+
110
+ expected = b'' .join (streams [0 :8 ])
111
+ self .assertEqual (expected , result )
112
+
113
+ def test_hls_scte35_no_start_with_cont (self ):
114
+ streams = ["[{0}]" .format (i ).encode ("ascii" ) for i in range (8 )]
115
+ playlists = [
116
+ self .getPlaylist (0 , [self .scte35_out_cont , 0 , 1 , 2 , 3 ]),
117
+ self .getPlaylist (4 , [self .scte35_in , 4 , 5 , 6 , 7 ]) + "#EXT-X-ENDLIST\n "
118
+ ]
119
+ result = self .get_result (streams , playlists )
120
+
121
+ expected = b'' .join (streams [4 :8 ])
122
+ self .assertEqual (expected , result )
123
+
124
+ def test_hls_scte35_no_end (self ):
125
+ streams = ["[{0}]" .format (i ).encode ("ascii" ) for i in range (12 )]
126
+ playlists = [
127
+ self .getPlaylist (0 , [0 , 1 , 2 , 3 ]),
128
+ self .getPlaylist (4 , [self .scte35_out , 4 , 5 , 6 , 7 ]),
129
+ self .getPlaylist (8 , [8 , 9 , 10 , 11 ]) + "#EXT-X-ENDLIST\n "
130
+ ]
131
+ result = self .get_result (streams , playlists )
132
+
133
+ expected = b'' .join (streams [0 :4 ])
134
+ self .assertEqual (expected , result )
135
+
136
+ def test_hls_scte35_in_between (self ):
137
+ streams = ["[{0}]" .format (i ).encode ("ascii" ) for i in range (20 )]
138
+ playlists = [
139
+ self .getPlaylist (0 , [0 , 1 , 2 , 3 ]),
140
+ self .getPlaylist (4 , [4 , 5 , self .scte35_out , 6 , 7 ]),
141
+ self .getPlaylist (8 , [8 , 9 , 10 , 11 ]),
142
+ self .getPlaylist (12 , [12 , 13 , self .scte35_in , 14 , 15 ]),
143
+ self .getPlaylist (16 , [16 , 17 , 18 , 19 ]) + "#EXT-X-ENDLIST\n "
144
+ ]
145
+ result = self .get_result (streams , playlists )
146
+
147
+ expected = b'' .join (streams [0 :6 ]) + b'' .join (streams [14 :20 ])
148
+ self .assertEqual (expected , result )
0 commit comments