25
25
Dict ,
26
26
Optional ,
27
27
Union ,
28
- TextIO ,
29
28
IO ,
30
29
overload ,
31
30
BinaryIO ,
40
39
BUFFER_SIZE = max (io .DEFAULT_BUFFER_SIZE , 128 * 1024 )
41
40
42
41
XOPEN_DEFAULT_GZIP_COMPRESSION = 1
42
+ XOPEN_DEFAULT_BZ2_COMPRESSION = 9
43
+ XOPEN_DEFAULT_XZ_COMPRESSION = 6
44
+ XOPEN_DEFAULT_ZST_COMPRESSION = 3
43
45
44
46
igzip : Optional [ModuleType ]
45
47
isal_zlib : Optional [ModuleType ]
@@ -163,7 +165,7 @@ class _PipedCompressionProgram(io.IOBase):
163
165
Read and write compressed files by running an external process and piping into it.
164
166
"""
165
167
166
- def __init__ ( # noqa: C901
168
+ def __init__ (
167
169
self ,
168
170
filename : FileOrPath ,
169
171
mode = "rb" ,
@@ -197,9 +199,10 @@ def __init__( # noqa: C901
197
199
raise ValueError (
198
200
f"compresslevel must be in { program_settings .acceptable_compression_levels } ."
199
201
)
202
+ self ._compresslevel = compresslevel
200
203
self .fileobj , self .closefd = _file_or_path_to_binary_stream (filename , mode )
201
- filepath = _filepath_from_path_or_filelike (filename )
202
- self .name : str = str (filepath )
204
+ self . _path = _filepath_from_path_or_filelike (filename )
205
+ self .name : str = str (self . _path )
203
206
self ._mode : str = mode
204
207
self ._stderr = tempfile .TemporaryFile ("w+b" )
205
208
self ._threads_flag : Optional [str ] = program_settings .threads_flag
@@ -213,7 +216,10 @@ def __init__( # noqa: C901
213
216
threads = min (_available_cpu_count (), 4 )
214
217
self ._threads = threads
215
218
216
- if threads != 0 and self ._threads_flag is not None :
219
+ self ._open_process ()
220
+
221
+ def _open_process (self ):
222
+ if self ._threads != 0 and self ._threads_flag is not None :
217
223
self ._program_args += [f"{ self ._threads_flag } { self ._threads } " ]
218
224
219
225
# Setting close_fds to True in the Popen arguments is necessary due to
@@ -227,12 +233,12 @@ def __init__( # noqa: C901
227
233
self .in_pipe = None
228
234
self .in_thread = None
229
235
self ._feeding = True
230
- if "r" in mode :
236
+ if "r" in self . _mode :
231
237
self ._program_args += ["-c" , "-d" ] # type: ignore
232
238
stdout = subprocess .PIPE
233
239
else :
234
- if compresslevel is not None :
235
- self ._program_args += ["-" + str (compresslevel )]
240
+ if self . _compresslevel is not None :
241
+ self ._program_args += ["-" + str (self . _compresslevel )]
236
242
stdout = self .fileobj # type: ignore
237
243
try :
238
244
self .process = subprocess .Popen (
@@ -247,7 +253,7 @@ def __init__( # noqa: C901
247
253
self .fileobj .close ()
248
254
raise
249
255
assert self .process .stdin is not None
250
- if "r" in mode :
256
+ if "r" in self . _mode :
251
257
self .in_pipe = self .process .stdin
252
258
# A python subprocess can read and write from pipes, but not from
253
259
# Python in-memory objects. In order for a program to read from an
@@ -429,26 +435,38 @@ def flush(self) -> None:
429
435
430
436
431
437
def _open_stdin_or_out (mode : str ) -> BinaryIO :
432
- assert "b" in mode
433
- std = sys .stdout if "w" in mode else sys .stdin
438
+ assert mode in ( "rb" , "ab" , "wb" )
439
+ std = sys .stdin if mode == "rb" else sys .stdout
434
440
return open (std .fileno (), mode = mode , closefd = False ) # type: ignore
435
441
436
442
437
- def _open_bz2 (filename : FileOrPath , mode : str , threads : Optional [int ]):
438
- assert "b" in mode
443
+ def _open_bz2 (
444
+ filename : FileOrPath ,
445
+ mode : str ,
446
+ compresslevel : Optional [int ],
447
+ threads : Optional [int ],
448
+ ):
449
+ assert mode in ("rb" , "ab" , "wb" )
450
+ if compresslevel is None :
451
+ compresslevel = XOPEN_DEFAULT_BZ2_COMPRESSION
439
452
if threads != 0 :
440
453
try :
441
454
# pbzip2 can compress using multiple cores.
442
455
return _PipedCompressionProgram (
443
456
filename ,
444
457
mode ,
458
+ compresslevel ,
445
459
threads = threads ,
446
460
program_settings = _PROGRAM_SETTINGS ["pbzip2" ],
447
461
)
448
462
except OSError :
449
463
pass # We try without threads.
450
464
451
- return bz2 .open (filename , mode )
465
+ bz2_file = bz2 .open (filename , mode , compresslevel )
466
+ if "r" in mode :
467
+ return bz2_file
468
+ # Buffer writes on bz2.open to mitigate overhead of small writes
469
+ return io .BufferedWriter (bz2_file ) # type: ignore
452
470
453
471
454
472
def _open_xz (
@@ -457,9 +475,9 @@ def _open_xz(
457
475
compresslevel : Optional [int ],
458
476
threads : Optional [int ],
459
477
):
460
- assert "b" in mode
478
+ assert mode in ( "rb" , "ab" , "wb" )
461
479
if compresslevel is None :
462
- compresslevel = 6
480
+ compresslevel = XOPEN_DEFAULT_XZ_COMPRESSION
463
481
464
482
if threads != 0 :
465
483
try :
@@ -474,23 +492,22 @@ def _open_xz(
474
492
except OSError :
475
493
pass # We try without threads.
476
494
477
- return lzma .open (
478
- filename ,
479
- mode ,
480
- preset = compresslevel if "w" in mode else None ,
481
- )
495
+ if "r" in mode :
496
+ return lzma .open (filename , mode )
497
+ # Buffer writes on lzma.open to mitigate overhead of small writes
498
+ return io .BufferedWriter (lzma .open (filename , mode , preset = compresslevel )) # type: ignore
482
499
483
500
484
- def _open_zst ( # noqa: C901
501
+ def _open_zst (
485
502
filename : FileOrPath ,
486
503
mode : str ,
487
504
compresslevel : Optional [int ],
488
505
threads : Optional [int ],
489
506
):
490
- assert "b" in mode
507
+ assert mode in ( "rb" , "ab" , "wb" )
491
508
assert compresslevel != 0
492
509
if compresslevel is None :
493
- compresslevel = 3
510
+ compresslevel = XOPEN_DEFAULT_ZST_COMPRESSION
494
511
if threads != 0 :
495
512
try :
496
513
# zstd can compress using multiple cores
@@ -508,19 +525,22 @@ def _open_zst( # noqa: C901
508
525
509
526
if zstandard is None :
510
527
raise ImportError ("zstandard module (python-zstandard) not available" )
511
- if compresslevel is not None and "w" in mode :
528
+ if compresslevel is not None and "r" not in mode :
512
529
cctx = zstandard .ZstdCompressor (level = compresslevel )
513
530
else :
514
531
cctx = None
515
532
f = zstandard .open (filename , mode , cctx = cctx ) # type: ignore
516
533
if mode == "rb" :
517
534
return io .BufferedReader (f )
518
- elif mode == "wb" :
519
- return io .BufferedWriter (f )
520
- return f
535
+ return io .BufferedWriter (f ) # mode "ab" and "wb"
521
536
522
537
523
- def _open_gz (filename : FileOrPath , mode : str , compresslevel , threads ):
538
+ def _open_gz (
539
+ filename : FileOrPath ,
540
+ mode : str ,
541
+ compresslevel : Optional [int ],
542
+ threads : Optional [int ],
543
+ ):
524
544
"""
525
545
Open a gzip file. The ISA-L library is preferred when applicable because
526
546
it is the fastest. Then zlib-ng which is not as fast, but supports all
@@ -530,11 +550,17 @@ def _open_gz(filename: FileOrPath, mode: str, compresslevel, threads):
530
550
only one core, it still finishes faster than using the builtin gzip library
531
551
as the (de)compression is moved to another thread.
532
552
"""
533
- assert "b" in mode
553
+ assert mode in ( "rb" , "ab" , "wb" )
534
554
if compresslevel is None :
535
555
# Force the same compression level on every tool regardless of
536
556
# library defaults
537
557
compresslevel = XOPEN_DEFAULT_GZIP_COMPRESSION
558
+ if compresslevel not in range (10 ):
559
+ # Level 0-9 are supported regardless of backend support
560
+ # (zlib_ng supports -1, pigz supports 11 etc.)
561
+ raise ValueError (
562
+ f"gzip compresslevel must be in range 0-9, got { compresslevel } ."
563
+ )
538
564
539
565
if threads != 0 :
540
566
# Igzip level 0 does not output uncompressed deflate blocks as zlib does
@@ -547,17 +573,14 @@ def _open_gz(filename: FileOrPath, mode: str, compresslevel, threads):
547
573
threads = 1 ,
548
574
)
549
575
if gzip_ng_threaded and zlib_ng :
550
- try :
551
- return gzip_ng_threaded .open (
552
- filename ,
553
- mode ,
554
- # zlib-ng level 1 is 50% bigger than zlib level 1. Level
555
- # 2 gives a size close to expectations.
556
- compresslevel = 2 if compresslevel == 1 else compresslevel ,
557
- threads = threads or max (_available_cpu_count (), 4 ),
558
- )
559
- except zlib_ng .error : # Bad compression level
560
- pass
576
+ return gzip_ng_threaded .open (
577
+ filename ,
578
+ mode ,
579
+ # zlib-ng level 1 is 50% bigger than zlib level 1. Level
580
+ # 2 gives a size close to expectations.
581
+ compresslevel = 2 if compresslevel == 1 else compresslevel ,
582
+ threads = threads or max (_available_cpu_count (), 4 ),
583
+ )
561
584
562
585
for program in ("pigz" , "gzip" ):
563
586
try :
@@ -568,7 +591,8 @@ def _open_gz(filename: FileOrPath, mode: str, compresslevel, threads):
568
591
threads ,
569
592
_PROGRAM_SETTINGS [program ],
570
593
)
571
- except OSError :
594
+ # ValueError when compresslevel is not supported. i.e. gzip and level 0
595
+ except (OSError , ValueError ):
572
596
pass # We try without threads.
573
597
return _open_reproducible_gzip (filename , mode = mode , compresslevel = compresslevel )
574
598
@@ -607,6 +631,9 @@ def _open_reproducible_gzip(filename, mode: str, compresslevel: int):
607
631
# is called. This forces it to be closed.
608
632
if closefd :
609
633
gzip_file .myfileobj = fileobj
634
+ if sys .version_info < (3 , 12 ) and "r" not in mode :
635
+ # From version 3.12 onwards, gzip is properly internally buffered for writing.
636
+ return io .BufferedWriter (gzip_file ) # type: ignore
610
637
return gzip_file
611
638
612
639
@@ -701,7 +728,7 @@ def xopen(
701
728
errors : Optional [str ] = ...,
702
729
newline : Optional [str ] = ...,
703
730
format : Optional [str ] = ...,
704
- ) -> TextIO :
731
+ ) -> io . TextIOWrapper :
705
732
...
706
733
707
734
@@ -720,7 +747,7 @@ def xopen(
720
747
...
721
748
722
749
723
- def xopen ( # noqa: C901 # The function is complex, but readable.
750
+ def xopen (
724
751
filename : FileOrPath ,
725
752
mode : Literal ["r" , "w" , "a" , "rt" , "rb" , "wt" , "wb" , "at" , "ab" ] = "r" ,
726
753
compresslevel : Optional [int ] = None ,
@@ -797,24 +824,12 @@ def xopen( # noqa: C901 # The function is complex, but readable.
797
824
elif detected_format == "xz" :
798
825
opened_file = _open_xz (filename , binary_mode , compresslevel , threads )
799
826
elif detected_format == "bz2" :
800
- opened_file = _open_bz2 (filename , binary_mode , threads )
827
+ opened_file = _open_bz2 (filename , binary_mode , compresslevel , threads )
801
828
elif detected_format == "zst" :
802
829
opened_file = _open_zst (filename , binary_mode , compresslevel , threads )
803
830
else :
804
831
opened_file , _ = _file_or_path_to_binary_stream (filename , binary_mode )
805
832
806
- # The "write" method for GzipFile is very costly. Lots of python calls are
807
- # made. To a lesser extent this is true for LzmaFile and BZ2File. By
808
- # putting a buffer in between, the expensive write method is called much
809
- # less. The effect is very noticeable when writing small units such as
810
- # lines or FASTQ records.
811
- if (
812
- isinstance (opened_file , (gzip .GzipFile , bz2 .BZ2File , lzma .LZMAFile )) # FIXME
813
- and "w" in mode
814
- ):
815
- opened_file = io .BufferedWriter (
816
- opened_file , buffer_size = BUFFER_SIZE # type: ignore
817
- )
818
833
if "t" in mode :
819
834
return io .TextIOWrapper (opened_file , encoding , errors , newline )
820
835
return opened_file
0 commit comments