19
19
import pathlib
20
20
import subprocess
21
21
import tempfile
22
+ import threading
22
23
import time
23
24
from subprocess import Popen , PIPE
24
25
from typing import (
@@ -164,7 +165,7 @@ class _PipedCompressionProgram(io.IOBase):
164
165
165
166
def __init__ ( # noqa: C901
166
167
self ,
167
- file : Union [ str , bytes , os . PathLike [ str ], os . PathLike [ bytes ], BinaryIO ] ,
168
+ file : BinaryIO ,
168
169
mode = "rb" ,
169
170
compresslevel : Optional [int ] = None ,
170
171
threads : Optional [int ] = None ,
@@ -188,26 +189,15 @@ def __init__( # noqa: C901
188
189
f"Mode is '{ mode } ', but it must be 'r', 'rb', 'w', 'wb', 'a', or 'ab'"
189
190
)
190
191
filepath : Union [str , bytes ] = ""
191
- if isinstance (file , (str , bytes , os .PathLike )):
192
- filepath = os .fspath (file )
193
- elif isinstance (file , io .IOBase ):
194
- # not supported (yet) for reading.
195
- if "r" in mode :
196
- raise OSError (
197
- f"File object not supported for reading through "
198
- f"{ self .__class__ .__name__ } ."
199
- )
200
- if hasattr (file , "name" ):
201
- filepath = file .name
192
+ if hasattr (file , "name" ):
193
+ filepath = file .name
202
194
if (
203
195
compresslevel is not None
204
196
and compresslevel not in program_settings .acceptable_compression_levels
205
197
):
206
198
raise ValueError (
207
199
f"compresslevel must be in { program_settings .acceptable_compression_levels } ."
208
200
)
209
- if isinstance (filepath , bytes ) and sys .platform == "win32" :
210
- filepath = filepath .decode ()
211
201
self .name : str = str (filepath )
212
202
self ._mode : str = mode
213
203
self ._stderr = tempfile .TemporaryFile ("w+b" )
@@ -233,36 +223,39 @@ def __init__( # noqa: C901
233
223
if sys .platform != "win32" :
234
224
close_fds = True
235
225
226
+ self .fileobj = file
227
+ self .in_pipe = None
228
+ self .in_thread = None
229
+ self ._feeding = True
236
230
if "r" in mode :
237
- self ._program_args += ["-c" , "-d" , file ] # type: ignore
238
- self .outfile : Optional [int ] = None
231
+ self ._program_args += ["-c" , "-d" ] # type: ignore
239
232
self .process = subprocess .Popen (
240
233
self ._program_args ,
241
234
stderr = self ._stderr ,
242
235
stdout = PIPE ,
236
+ stdin = PIPE ,
243
237
close_fds = close_fds ,
244
238
) # type: ignore
239
+ assert self .process .stdin is not None
240
+ self .in_pipe = self .process .stdin
241
+ self .in_thread = threading .Thread (target = self ._feed_pipe )
242
+ self .in_thread .start ()
245
243
self ._file : BinaryIO = self .process .stdout # type: ignore
246
244
self ._wait_for_output_or_process_exit ()
247
245
self ._raise_if_error ()
248
246
else :
249
247
if compresslevel is not None :
250
248
self ._program_args += ["-" + str (compresslevel )]
251
- # complex file handlers : pass through
252
- if hasattr (file , "fileno" ):
253
- self .outfile = file .fileno ()
254
- else :
255
- self .outfile = os .open (file , os .O_WRONLY ) # type: ignore
256
249
try :
257
250
self .process = Popen (
258
251
self ._program_args ,
259
252
stderr = self ._stderr ,
260
253
stdin = PIPE ,
261
- stdout = self .outfile ,
254
+ stdout = self .fileobj ,
262
255
close_fds = close_fds ,
263
256
) # type: ignore
264
257
except OSError :
265
- os .close (self . outfile )
258
+ file .close ()
266
259
raise
267
260
assert self .process .stdin is not None
268
261
self ._file = self .process .stdin # type: ignore
@@ -277,6 +270,14 @@ def __repr__(self):
277
270
f"threads={ self ._threads } )"
278
271
)
279
272
273
+ def _feed_pipe (self ):
274
+ while self ._feeding :
275
+ chunk = self .fileobj .read (BUFFER_SIZE )
276
+ if chunk == b"" :
277
+ self .in_pipe .close ()
278
+ return
279
+ self .in_pipe .write (chunk )
280
+
280
281
def write (self , arg : bytes ) -> int :
281
282
return self ._file .write (arg )
282
283
@@ -311,10 +312,10 @@ def close(self) -> None:
311
312
self ._stderr .close ()
312
313
return
313
314
check_allowed_code_and_message = False
314
- if self .outfile : # Opened for writing.
315
+ if self .fileobj :
315
316
self ._file .close ()
316
317
self .process .wait ()
317
- os . close (self . outfile )
318
+ self . fileobj . close ()
318
319
else :
319
320
retcode = self .process .poll ()
320
321
if retcode is None :
@@ -592,7 +593,7 @@ def _detect_format_from_content(fileobj: BinaryIO) -> Optional[str]:
592
593
"""
593
594
if hasattr (fileobj , "peek" ):
594
595
bs = fileobj .peek (6 )
595
- elif fileobj .seekable ():
596
+ elif hasattr ( fileobj , "seekable" ) and fileobj .seekable ():
596
597
current_pos = fileobj .tell ()
597
598
bs = fileobj .read (6 )
598
599
fileobj .seek (current_pos )
0 commit comments