Skip to content

Commit ec85410

Browse files
committed
Use a thread to feed an inpipe for reading
1 parent 9a9838f commit ec85410

File tree

1 file changed

+27
-26
lines changed

1 file changed

+27
-26
lines changed

src/xopen/__init__.py

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import pathlib
2020
import subprocess
2121
import tempfile
22+
import threading
2223
import time
2324
from subprocess import Popen, PIPE
2425
from typing import (
@@ -164,7 +165,7 @@ class _PipedCompressionProgram(io.IOBase):
164165

165166
def __init__( # noqa: C901
166167
self,
167-
file: Union[str, bytes, os.PathLike[str], os.PathLike[bytes], BinaryIO],
168+
file: BinaryIO,
168169
mode="rb",
169170
compresslevel: Optional[int] = None,
170171
threads: Optional[int] = None,
@@ -188,26 +189,15 @@ def __init__( # noqa: C901
188189
f"Mode is '{mode}', but it must be 'r', 'rb', 'w', 'wb', 'a', or 'ab'"
189190
)
190191
filepath: Union[str, bytes] = ""
191-
if isinstance(file, (str, bytes, os.PathLike)):
192-
filepath = os.fspath(file)
193-
elif isinstance(file, io.IOBase):
194-
# not supported (yet) for reading.
195-
if "r" in mode:
196-
raise OSError(
197-
f"File object not supported for reading through "
198-
f"{self.__class__.__name__}."
199-
)
200-
if hasattr(file, "name"):
201-
filepath = file.name
192+
if hasattr(file, "name"):
193+
filepath = file.name
202194
if (
203195
compresslevel is not None
204196
and compresslevel not in program_settings.acceptable_compression_levels
205197
):
206198
raise ValueError(
207199
f"compresslevel must be in {program_settings.acceptable_compression_levels}."
208200
)
209-
if isinstance(filepath, bytes) and sys.platform == "win32":
210-
filepath = filepath.decode()
211201
self.name: str = str(filepath)
212202
self._mode: str = mode
213203
self._stderr = tempfile.TemporaryFile("w+b")
@@ -233,36 +223,39 @@ def __init__( # noqa: C901
233223
if sys.platform != "win32":
234224
close_fds = True
235225

226+
self.fileobj = file
227+
self.in_pipe = None
228+
self.in_thread = None
229+
self._feeding = True
236230
if "r" in mode:
237-
self._program_args += ["-c", "-d", file] # type: ignore
238-
self.outfile: Optional[int] = None
231+
self._program_args += ["-c", "-d"] # type: ignore
239232
self.process = subprocess.Popen(
240233
self._program_args,
241234
stderr=self._stderr,
242235
stdout=PIPE,
236+
stdin=PIPE,
243237
close_fds=close_fds,
244238
) # type: ignore
239+
assert self.process.stdin is not None
240+
self.in_pipe = self.process.stdin
241+
self.in_thread = threading.Thread(target=self._feed_pipe)
242+
self.in_thread.start()
245243
self._file: BinaryIO = self.process.stdout # type: ignore
246244
self._wait_for_output_or_process_exit()
247245
self._raise_if_error()
248246
else:
249247
if compresslevel is not None:
250248
self._program_args += ["-" + str(compresslevel)]
251-
# complex file handlers : pass through
252-
if hasattr(file, "fileno"):
253-
self.outfile = file.fileno()
254-
else:
255-
self.outfile = os.open(file, os.O_WRONLY) # type: ignore
256249
try:
257250
self.process = Popen(
258251
self._program_args,
259252
stderr=self._stderr,
260253
stdin=PIPE,
261-
stdout=self.outfile,
254+
stdout=self.fileobj,
262255
close_fds=close_fds,
263256
) # type: ignore
264257
except OSError:
265-
os.close(self.outfile)
258+
file.close()
266259
raise
267260
assert self.process.stdin is not None
268261
self._file = self.process.stdin # type: ignore
@@ -277,6 +270,14 @@ def __repr__(self):
277270
f"threads={self._threads})"
278271
)
279272

273+
def _feed_pipe(self):
274+
while self._feeding:
275+
chunk = self.fileobj.read(BUFFER_SIZE)
276+
if chunk == b"":
277+
self.in_pipe.close()
278+
return
279+
self.in_pipe.write(chunk)
280+
280281
def write(self, arg: bytes) -> int:
281282
return self._file.write(arg)
282283

@@ -311,10 +312,10 @@ def close(self) -> None:
311312
self._stderr.close()
312313
return
313314
check_allowed_code_and_message = False
314-
if self.outfile: # Opened for writing.
315+
if self.fileobj:
315316
self._file.close()
316317
self.process.wait()
317-
os.close(self.outfile)
318+
self.fileobj.close()
318319
else:
319320
retcode = self.process.poll()
320321
if retcode is None:
@@ -592,7 +593,7 @@ def _detect_format_from_content(fileobj: BinaryIO) -> Optional[str]:
592593
"""
593594
if hasattr(fileobj, "peek"):
594595
bs = fileobj.peek(6)
595-
elif fileobj.seekable():
596+
elif hasattr(fileobj, "seekable") and fileobj.seekable():
596597
current_pos = fileobj.tell()
597598
bs = fileobj.read(6)
598599
fileobj.seek(current_pos)

0 commit comments

Comments
 (0)