Skip to content

Commit 45bd313

Browse files
authored
Merge pull request #154 from pycompression/variousstuff
Various small fixes
2 parents 62353e7 + 4cd0887 commit 45bd313

File tree

2 files changed

+89
-64
lines changed

2 files changed

+89
-64
lines changed

src/xopen/__init__.py

Lines changed: 72 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
Dict,
2626
Optional,
2727
Union,
28-
TextIO,
2928
IO,
3029
overload,
3130
BinaryIO,
@@ -40,6 +39,9 @@
4039
BUFFER_SIZE = max(io.DEFAULT_BUFFER_SIZE, 128 * 1024)
4140

4241
XOPEN_DEFAULT_GZIP_COMPRESSION = 1
42+
XOPEN_DEFAULT_BZ2_COMPRESSION = 9
43+
XOPEN_DEFAULT_XZ_COMPRESSION = 6
44+
XOPEN_DEFAULT_ZST_COMPRESSION = 3
4345

4446
igzip: Optional[ModuleType]
4547
isal_zlib: Optional[ModuleType]
@@ -163,7 +165,7 @@ class _PipedCompressionProgram(io.IOBase):
163165
Read and write compressed files by running an external process and piping into it.
164166
"""
165167

166-
def __init__( # noqa: C901
168+
def __init__(
167169
self,
168170
filename: FileOrPath,
169171
mode="rb",
@@ -197,9 +199,10 @@ def __init__( # noqa: C901
197199
raise ValueError(
198200
f"compresslevel must be in {program_settings.acceptable_compression_levels}."
199201
)
202+
self._compresslevel = compresslevel
200203
self.fileobj, self.closefd = _file_or_path_to_binary_stream(filename, mode)
201-
filepath = _filepath_from_path_or_filelike(filename)
202-
self.name: str = str(filepath)
204+
self._path = _filepath_from_path_or_filelike(filename)
205+
self.name: str = str(self._path)
203206
self._mode: str = mode
204207
self._stderr = tempfile.TemporaryFile("w+b")
205208
self._threads_flag: Optional[str] = program_settings.threads_flag
@@ -213,7 +216,10 @@ def __init__( # noqa: C901
213216
threads = min(_available_cpu_count(), 4)
214217
self._threads = threads
215218

216-
if threads != 0 and self._threads_flag is not None:
219+
self._open_process()
220+
221+
def _open_process(self):
222+
if self._threads != 0 and self._threads_flag is not None:
217223
self._program_args += [f"{self._threads_flag}{self._threads}"]
218224

219225
# Setting close_fds to True in the Popen arguments is necessary due to
@@ -227,12 +233,12 @@ def __init__( # noqa: C901
227233
self.in_pipe = None
228234
self.in_thread = None
229235
self._feeding = True
230-
if "r" in mode:
236+
if "r" in self._mode:
231237
self._program_args += ["-c", "-d"] # type: ignore
232238
stdout = subprocess.PIPE
233239
else:
234-
if compresslevel is not None:
235-
self._program_args += ["-" + str(compresslevel)]
240+
if self._compresslevel is not None:
241+
self._program_args += ["-" + str(self._compresslevel)]
236242
stdout = self.fileobj # type: ignore
237243
try:
238244
self.process = subprocess.Popen(
@@ -247,7 +253,7 @@ def __init__( # noqa: C901
247253
self.fileobj.close()
248254
raise
249255
assert self.process.stdin is not None
250-
if "r" in mode:
256+
if "r" in self._mode:
251257
self.in_pipe = self.process.stdin
252258
# A python subprocess can read and write from pipes, but not from
253259
# Python in-memory objects. In order for a program to read from an
@@ -429,26 +435,38 @@ def flush(self) -> None:
429435

430436

431437
def _open_stdin_or_out(mode: str) -> BinaryIO:
432-
assert "b" in mode
433-
std = sys.stdout if "w" in mode else sys.stdin
438+
assert mode in ("rb", "ab", "wb")
439+
std = sys.stdin if mode == "rb" else sys.stdout
434440
return open(std.fileno(), mode=mode, closefd=False) # type: ignore
435441

436442

437-
def _open_bz2(filename: FileOrPath, mode: str, threads: Optional[int]):
438-
assert "b" in mode
443+
def _open_bz2(
444+
filename: FileOrPath,
445+
mode: str,
446+
compresslevel: Optional[int],
447+
threads: Optional[int],
448+
):
449+
assert mode in ("rb", "ab", "wb")
450+
if compresslevel is None:
451+
compresslevel = XOPEN_DEFAULT_BZ2_COMPRESSION
439452
if threads != 0:
440453
try:
441454
# pbzip2 can compress using multiple cores.
442455
return _PipedCompressionProgram(
443456
filename,
444457
mode,
458+
compresslevel,
445459
threads=threads,
446460
program_settings=_PROGRAM_SETTINGS["pbzip2"],
447461
)
448462
except OSError:
449463
pass # We try without threads.
450464

451-
return bz2.open(filename, mode)
465+
bz2_file = bz2.open(filename, mode, compresslevel)
466+
if "r" in mode:
467+
return bz2_file
468+
# Buffer writes on bz2.open to mitigate overhead of small writes
469+
return io.BufferedWriter(bz2_file) # type: ignore
452470

453471

454472
def _open_xz(
@@ -457,9 +475,9 @@ def _open_xz(
457475
compresslevel: Optional[int],
458476
threads: Optional[int],
459477
):
460-
assert "b" in mode
478+
assert mode in ("rb", "ab", "wb")
461479
if compresslevel is None:
462-
compresslevel = 6
480+
compresslevel = XOPEN_DEFAULT_XZ_COMPRESSION
463481

464482
if threads != 0:
465483
try:
@@ -474,23 +492,22 @@ def _open_xz(
474492
except OSError:
475493
pass # We try without threads.
476494

477-
return lzma.open(
478-
filename,
479-
mode,
480-
preset=compresslevel if "w" in mode else None,
481-
)
495+
if "r" in mode:
496+
return lzma.open(filename, mode)
497+
# Buffer writes on lzma.open to mitigate overhead of small writes
498+
return io.BufferedWriter(lzma.open(filename, mode, preset=compresslevel)) # type: ignore
482499

483500

484-
def _open_zst( # noqa: C901
501+
def _open_zst(
485502
filename: FileOrPath,
486503
mode: str,
487504
compresslevel: Optional[int],
488505
threads: Optional[int],
489506
):
490-
assert "b" in mode
507+
assert mode in ("rb", "ab", "wb")
491508
assert compresslevel != 0
492509
if compresslevel is None:
493-
compresslevel = 3
510+
compresslevel = XOPEN_DEFAULT_ZST_COMPRESSION
494511
if threads != 0:
495512
try:
496513
# zstd can compress using multiple cores
@@ -508,19 +525,22 @@ def _open_zst( # noqa: C901
508525

509526
if zstandard is None:
510527
raise ImportError("zstandard module (python-zstandard) not available")
511-
if compresslevel is not None and "w" in mode:
528+
if compresslevel is not None and "r" not in mode:
512529
cctx = zstandard.ZstdCompressor(level=compresslevel)
513530
else:
514531
cctx = None
515532
f = zstandard.open(filename, mode, cctx=cctx) # type: ignore
516533
if mode == "rb":
517534
return io.BufferedReader(f)
518-
elif mode == "wb":
519-
return io.BufferedWriter(f)
520-
return f
535+
return io.BufferedWriter(f) # mode "ab" and "wb"
521536

522537

523-
def _open_gz(filename: FileOrPath, mode: str, compresslevel, threads):
538+
def _open_gz(
539+
filename: FileOrPath,
540+
mode: str,
541+
compresslevel: Optional[int],
542+
threads: Optional[int],
543+
):
524544
"""
525545
Open a gzip file. The ISA-L library is preferred when applicable because
526546
it is the fastest. Then zlib-ng which is not as fast, but supports all
@@ -530,11 +550,17 @@ def _open_gz(filename: FileOrPath, mode: str, compresslevel, threads):
530550
only one core, it still finishes faster than using the builtin gzip library
531551
as the (de)compression is moved to another thread.
532552
"""
533-
assert "b" in mode
553+
assert mode in ("rb", "ab", "wb")
534554
if compresslevel is None:
535555
# Force the same compression level on every tool regardless of
536556
# library defaults
537557
compresslevel = XOPEN_DEFAULT_GZIP_COMPRESSION
558+
if compresslevel not in range(10):
559+
# Level 0-9 are supported regardless of backend support
560+
# (zlib_ng supports -1, pigz supports 11 etc.)
561+
raise ValueError(
562+
f"gzip compresslevel must be in range 0-9, got {compresslevel}."
563+
)
538564

539565
if threads != 0:
540566
# Igzip level 0 does not output uncompressed deflate blocks as zlib does
@@ -547,17 +573,14 @@ def _open_gz(filename: FileOrPath, mode: str, compresslevel, threads):
547573
threads=1,
548574
)
549575
if gzip_ng_threaded and zlib_ng:
550-
try:
551-
return gzip_ng_threaded.open(
552-
filename,
553-
mode,
554-
# zlib-ng level 1 is 50% bigger than zlib level 1. Level
555-
# 2 gives a size close to expectations.
556-
compresslevel=2 if compresslevel == 1 else compresslevel,
557-
threads=threads or max(_available_cpu_count(), 4),
558-
)
559-
except zlib_ng.error: # Bad compression level
560-
pass
576+
return gzip_ng_threaded.open(
577+
filename,
578+
mode,
579+
# zlib-ng level 1 is 50% bigger than zlib level 1. Level
580+
# 2 gives a size close to expectations.
581+
compresslevel=2 if compresslevel == 1 else compresslevel,
582+
threads=threads or max(_available_cpu_count(), 4),
583+
)
561584

562585
for program in ("pigz", "gzip"):
563586
try:
@@ -568,7 +591,8 @@ def _open_gz(filename: FileOrPath, mode: str, compresslevel, threads):
568591
threads,
569592
_PROGRAM_SETTINGS[program],
570593
)
571-
except OSError:
594+
# ValueError when compresslevel is not supported. i.e. gzip and level 0
595+
except (OSError, ValueError):
572596
pass # We try without threads.
573597
return _open_reproducible_gzip(filename, mode=mode, compresslevel=compresslevel)
574598

@@ -607,6 +631,9 @@ def _open_reproducible_gzip(filename, mode: str, compresslevel: int):
607631
# is called. This forces it to be closed.
608632
if closefd:
609633
gzip_file.myfileobj = fileobj
634+
if sys.version_info < (3, 12) and "r" not in mode:
635+
# From version 3.12 onwards, gzip is properly internally buffered for writing.
636+
return io.BufferedWriter(gzip_file) # type: ignore
610637
return gzip_file
611638

612639

@@ -701,7 +728,7 @@ def xopen(
701728
errors: Optional[str] = ...,
702729
newline: Optional[str] = ...,
703730
format: Optional[str] = ...,
704-
) -> TextIO:
731+
) -> io.TextIOWrapper:
705732
...
706733

707734

@@ -720,7 +747,7 @@ def xopen(
720747
...
721748

722749

723-
def xopen( # noqa: C901 # The function is complex, but readable.
750+
def xopen(
724751
filename: FileOrPath,
725752
mode: Literal["r", "w", "a", "rt", "rb", "wt", "wb", "at", "ab"] = "r",
726753
compresslevel: Optional[int] = None,
@@ -797,24 +824,12 @@ def xopen( # noqa: C901 # The function is complex, but readable.
797824
elif detected_format == "xz":
798825
opened_file = _open_xz(filename, binary_mode, compresslevel, threads)
799826
elif detected_format == "bz2":
800-
opened_file = _open_bz2(filename, binary_mode, threads)
827+
opened_file = _open_bz2(filename, binary_mode, compresslevel, threads)
801828
elif detected_format == "zst":
802829
opened_file = _open_zst(filename, binary_mode, compresslevel, threads)
803830
else:
804831
opened_file, _ = _file_or_path_to_binary_stream(filename, binary_mode)
805832

806-
# The "write" method for GzipFile is very costly. Lots of python calls are
807-
# made. To a lesser extent this is true for LzmaFile and BZ2File. By
808-
# putting a buffer in between, the expensive write method is called much
809-
# less. The effect is very noticeable when writing small units such as
810-
# lines or FASTQ records.
811-
if (
812-
isinstance(opened_file, (gzip.GzipFile, bz2.BZ2File, lzma.LZMAFile)) # FIXME
813-
and "w" in mode
814-
):
815-
opened_file = io.BufferedWriter(
816-
opened_file, buffer_size=BUFFER_SIZE # type: ignore
817-
)
818833
if "t" in mode:
819834
return io.TextIOWrapper(opened_file, encoding, errors, newline)
820835
return opened_file

tests/test_xopen.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
Tests for the xopen.xopen function
33
"""
44
import bz2
5+
import sys
56
from contextlib import contextmanager
67
import functools
78
import gzip
@@ -263,13 +264,16 @@ def test_invalid_compression_level(tmp_path):
263264

264265

265266
@pytest.mark.parametrize("ext", extensions)
266-
def test_append(ext, tmp_path):
267+
@pytest.mark.parametrize("threads", (0, 1))
268+
def test_append(ext, threads, tmp_path):
269+
if ext == ".zst" and zstandard is None and threads == 0:
270+
pytest.skip("No zstandard installed")
267271
text = b"AB"
268272
reference = text + text
269273
path = tmp_path / f"the-file{ext}"
270-
with xopen(path, "ab") as f:
274+
with xopen(path, "ab", threads=threads) as f:
271275
f.write(text)
272-
with xopen(path, "ab") as f:
276+
with xopen(path, "ab", threads=threads) as f:
273277
f.write(text)
274278
with xopen(path, "r") as f:
275279
for appended in f:
@@ -383,16 +387,22 @@ def test_write_no_threads(tmp_path, ext):
383387
return
384388
klass = klasses[ext]
385389
with xopen(tmp_path / f"out{ext}", "wb", threads=0) as f:
386-
assert isinstance(f, io.BufferedWriter)
387-
if ext:
388-
assert isinstance(f.raw, klass), f
390+
if isinstance(f, io.BufferedWriter):
391+
if ext:
392+
assert isinstance(f.raw, klass), f
393+
else:
394+
if ext:
395+
assert isinstance(f, klass)
389396

390397

391398
def test_write_gzip_no_threads_no_isal(tmp_path, xopen_without_igzip):
392399
import gzip
393400

394401
with xopen_without_igzip(tmp_path / "out.gz", "wb", threads=0) as f:
395-
assert isinstance(f.raw, gzip.GzipFile), f
402+
if sys.version_info.major == 3 and sys.version_info.minor >= 12:
403+
assert isinstance(f, gzip.GzipFile), f
404+
else:
405+
assert isinstance(f.raw, gzip.GzipFile)
396406

397407

398408
def test_write_stdout():

0 commit comments

Comments
 (0)