|
2 | 2 | Tests for the xopen.xopen function
|
3 | 3 | """
|
4 | 4 | import bz2
|
| 5 | +import subprocess |
5 | 6 | import sys
|
| 7 | +import tempfile |
6 | 8 | from contextlib import contextmanager
|
7 | 9 | import functools
|
8 | 10 | import gzip
|
@@ -634,3 +636,75 @@ def test_pass_bytesio_for_reading_and_writing(ext, threads):
|
634 | 636 | filelike.seek(0)
|
635 | 637 | with xopen(filelike, "rb", format=format, threads=threads) as fh:
|
636 | 638 | assert fh.readline() == first_line
|
| 639 | + |
| 640 | + |
| 641 | +@pytest.mark.parametrize("threads", (0, 1)) |
| 642 | +def test_xopen_stdin(monkeypatch, ext, threads): |
| 643 | + if ext == ".zst" and zstandard is None: |
| 644 | + return |
| 645 | + # Add encoding to suppress encoding warnings |
| 646 | + with open(TEST_DIR / f"file.txt{ext}", "rt", encoding="latin-1") as in_file: |
| 647 | + monkeypatch.setattr("sys.stdin", in_file) |
| 648 | + with xopen("-", "rt", threads=threads) as f: |
| 649 | + data = f.read() |
| 650 | + assert data == CONTENT |
| 651 | + |
| 652 | + |
| 653 | +def test_xopen_stdout(monkeypatch): |
| 654 | + # Add encoding to suppress encoding warnings |
| 655 | + with tempfile.TemporaryFile(mode="w+t", encoding="latin-1") as raw: |
| 656 | + monkeypatch.setattr("sys.stdout", raw) |
| 657 | + with xopen("-", "wt") as f: |
| 658 | + f.write("Hello world!") |
| 659 | + raw.seek(0) |
| 660 | + data = raw.read() |
| 661 | + assert data == "Hello world!" |
| 662 | + |
| 663 | + |
| 664 | +@pytest.mark.parametrize("threads", (0, 1)) |
| 665 | +def test_xopen_read_from_pipe(ext, threads): |
| 666 | + if ext == ".zst" and zstandard is None: |
| 667 | + return |
| 668 | + in_file = TEST_DIR / f"file.txt{ext}" |
| 669 | + process = subprocess.Popen(("cat", str(in_file)), stdout=subprocess.PIPE) |
| 670 | + with xopen(process.stdout, "rt", threads=threads) as f: |
| 671 | + data = f.read() |
| 672 | + process.wait() |
| 673 | + process.stdout.close() |
| 674 | + assert data == CONTENT |
| 675 | + |
| 676 | + |
| 677 | +@pytest.mark.parametrize("threads", (0, 1)) |
| 678 | +def test_xopen_write_to_pipe(threads, ext): |
| 679 | + if ext == ".zst" and zstandard is None: |
| 680 | + return |
| 681 | + format = ext.lstrip(".") |
| 682 | + if format == "": |
| 683 | + format = None |
| 684 | + process = subprocess.Popen(("cat",), stdout=subprocess.PIPE, stdin=subprocess.PIPE) |
| 685 | + with xopen(process.stdin, "wt", threads=threads, format=format) as f: |
| 686 | + f.write(CONTENT) |
| 687 | + process.stdin.close() |
| 688 | + with xopen(process.stdout, "rt", threads=threads) as f: |
| 689 | + data = f.read() |
| 690 | + process.wait() |
| 691 | + process.stdout.close() |
| 692 | + assert data == CONTENT |
| 693 | + |
| 694 | + |
| 695 | +@pytest.mark.skipif( |
| 696 | + not os.path.exists("/dev/stdin"), reason="/dev/stdin does not exist" |
| 697 | +) |
| 698 | +@pytest.mark.parametrize("threads", (0, 1)) |
| 699 | +def test_xopen_dev_stdin_read(threads, ext): |
| 700 | + if ext == ".zst" and zstandard is None: |
| 701 | + return |
| 702 | + file = str(Path(__file__).parent / f"file.txt{ext}") |
| 703 | + result = subprocess.run( |
| 704 | + f"cat {file} | python -c 'import xopen; " |
| 705 | + f'f=xopen.xopen("/dev/stdin", "rt", threads={threads});print(f.read())\'', |
| 706 | + shell=True, |
| 707 | + stdout=subprocess.PIPE, |
| 708 | + encoding="ascii", |
| 709 | + ) |
| 710 | + assert result.stdout == CONTENT + "\n" |
0 commit comments