Skip to content

Shuffle p2p #8813

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Fixup p2p bugs
  • Loading branch information
phofl committed Aug 7, 2024
commit 938ec20bdca01ea28aafa700dc4fc0a375700711
5 changes: 4 additions & 1 deletion distributed/shuffle/_shuffle_array.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
from __future__ import annotations

Check warning on line 1 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L1

Added line #L1 was not covered by tests

import functools
import mmap
import os
from collections import defaultdict
from collections.abc import Generator, Hashable, Sequence
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from itertools import product
from pathlib import Path
from typing import Any, Callable

Check warning on line 12 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L3-L12

Added lines #L3 - L12 were not covered by tests

import numpy as np
from tornado.ioloop import IOLoop

Check warning on line 15 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L14-L15

Added lines #L14 - L15 were not covered by tests

from dask.sizeof import sizeof
from dask.typing import Key

Check warning on line 18 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L17-L18

Added lines #L17 - L18 were not covered by tests

from distributed.core import PooledRPCCall
from distributed.metrics import context_meter
from distributed.shuffle import ShuffleWorkerPlugin
from distributed.shuffle._core import (

Check warning on line 23 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L20-L23

Added lines #L20 - L23 were not covered by tests
NDIndex,
ShuffleId,
ShuffleRun,
Expand All @@ -29,59 +29,62 @@
get_worker_plugin,
handle_transfer_errors,
)
from distributed.shuffle._limiter import ResourceLimiter
from distributed.shuffle._pickle import unpickle_bytestream
from distributed.shuffle._rechunk import rechunk_unpack
from distributed.shuffle._shuffle import shuffle_barrier

Check warning on line 35 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L32-L35

Added lines #L32 - L35 were not covered by tests


def shuffle_name(token: str) -> str:
return f"shuffle-p2p-{token}"

Check warning on line 39 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L38-L39

Added lines #L38 - L39 were not covered by tests


def _p2p_shuffle( # type: ignore[no-untyped-def]

Check warning on line 42 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L42

Added line #L42 was not covered by tests
chunks, new_chunks, axis, in_name: str, out_name: str, disk: bool = True
) -> dict[Key, Any]:
from dask.array._shuffle import convert_key

Check warning on line 45 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L45

Added line #L45 was not covered by tests

arrays = []
for i, new_chunk in enumerate(new_chunks):
arrays.append(np.array([new_chunk, [i] * len(new_chunk)]))

Check warning on line 49 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L47-L49

Added lines #L47 - L49 were not covered by tests

result = np.concatenate(arrays, axis=1)
sorter = np.argsort(result[0, :])
sorted_indexer = result[:, sorter]
chunk_boundaries = np.cumsum((0,) + chunks[axis])

Check warning on line 54 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L51-L54

Added lines #L51 - L54 were not covered by tests

dsk: dict[Key, Any] = {}

Check warning on line 56 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L56

Added line #L56 was not covered by tests

# Use `token` to generate a canonical group for the entire rechunk
token = out_name.split("-")[-1]
transfer_group = f"shuffle-transfer-{token}"
unpack_group = out_name
_barrier_key = barrier_key(ShuffleId(token))

Check warning on line 62 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L59-L62

Added lines #L59 - L62 were not covered by tests

# Get existing chunk tuple locations
chunk_tuples = list(

Check warning on line 65 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L65

Added line #L65 was not covered by tests
product(*(range(len(c)) for i, c in enumerate(chunks) if i != axis))
)
chunk_lengths = [len(c) for c in chunks]
chunk_lengths[axis] = len(np.unique(result[1, :]))

Check warning on line 69 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L68-L69

Added lines #L68 - L69 were not covered by tests

transfer_keys = []

Check warning on line 71 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L71

Added line #L71 was not covered by tests

for i, (start, stop) in enumerate(zip(chunk_boundaries[:-1], chunk_boundaries[1:])):
start = np.searchsorted(sorted_indexer[0, :], start)
stop = np.searchsorted(sorted_indexer[0, :], stop)

Check warning on line 75 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L73-L75

Added lines #L73 - L75 were not covered by tests

chunk_indexer = sorted_indexer[:, start:stop].copy()
if len(chunk_indexer) == 0:
if chunk_indexer.shape[1] == 0:

Check warning on line 78 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L77-L78

Added lines #L77 - L78 were not covered by tests
# skip output chunks that don't get any data
continue

Check warning on line 80 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L80

Added line #L80 was not covered by tests

chunk_indexer[0, :] -= chunk_boundaries[i]

Check warning on line 82 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L82

Added line #L82 was not covered by tests

for chunk_tuple in chunk_tuples:
key = (transfer_group,) + convert_key(chunk_tuple, i, axis)
transfer_keys.append(key)
dsk[key] = (

Check warning on line 87 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L84-L87

Added lines #L84 - L87 were not covered by tests
shuffle_transfer,
(in_name,) + convert_key(chunk_tuple, i, axis),
token,
Expand All @@ -93,15 +96,15 @@
disk,
)

dsk[_barrier_key] = (shuffle_barrier, token, transfer_keys)

Check warning on line 99 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L99

Added line #L99 was not covered by tests

for axis_chunk in np.unique(result[1, :]):
sorter = np.argsort(result[0, result[1, :] == axis_chunk])

Check warning on line 102 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L101-L102

Added lines #L101 - L102 were not covered by tests

for chunk_tuple in chunk_tuples:
chunk_key = convert_key(chunk_tuple, int(axis_chunk), axis)

Check warning on line 105 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L104-L105

Added lines #L104 - L105 were not covered by tests

dsk[(unpack_group,) + chunk_key] = (

Check warning on line 107 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L107

Added line #L107 was not covered by tests
_shuffle_unpack,
token,
chunk_key,
Expand All @@ -109,25 +112,25 @@
sorter,
axis,
)
return dsk

Check warning on line 115 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L115

Added line #L115 was not covered by tests


def _shuffle_unpack(

Check warning on line 118 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L118

Added line #L118 was not covered by tests
id: ShuffleId,
output_chunk: NDIndex,
barrier_run_id: int,
sorter: np.ndarray,
axis: int,
) -> np.ndarray:
result = rechunk_unpack(id, output_chunk, barrier_run_id)
slicer = [

Check warning on line 126 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L125-L126

Added lines #L125 - L126 were not covered by tests
slice(None),
] * len(result.shape)
slicer[axis] = np.argsort(sorter) # type: ignore[call-overload]
return result[*slicer]

Check warning on line 130 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L129-L130

Added lines #L129 - L130 were not covered by tests


def shuffle_transfer(

Check warning on line 133 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L133

Added line #L133 was not covered by tests
input: np.ndarray,
id: ShuffleId,
chunk_indexer: tuple[np.ndarray, np.ndarray],
Expand All @@ -137,8 +140,8 @@
input_chunk: Any,
disk: bool,
) -> int:
with handle_transfer_errors(id):
return get_worker_plugin().add_partition(

Check warning on line 144 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L143-L144

Added lines #L143 - L144 were not covered by tests
input,
partition_id=input_chunk,
spec=ArrayShuffleSpec(
Expand All @@ -149,37 +152,37 @@
)


@dataclass(frozen=True)
class ArrayShuffleSpec(ShuffleSpec[NDIndex]):
chunk_lengths: tuple[int, ...]
axis: int

Check warning on line 158 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L155-L158

Added lines #L155 - L158 were not covered by tests

@property
def output_partitions(self) -> Generator[NDIndex, None, None]:
yield from product(*(range(c) for c in self.chunk_lengths))

Check warning on line 162 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L160-L162

Added lines #L160 - L162 were not covered by tests

@functools.cached_property
def positions(self) -> list[int]:
return [1] + np.cumprod(self.chunk_lengths).tolist()

Check warning on line 166 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L164-L166

Added lines #L164 - L166 were not covered by tests

def pick_worker(self, partition: NDIndex, workers: Sequence[str]) -> str:
npartitions = 1
for c in self.chunk_lengths:
npartitions *= c
ix = 0
for dim, pos in enumerate(partition):
ix += self.positions[dim] * pos
i = len(workers) * ix // npartitions
return workers[i]

Check warning on line 176 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L168-L176

Added lines #L168 - L176 were not covered by tests

def create_run_on_worker(

Check warning on line 178 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L178

Added line #L178 was not covered by tests
self,
run_id: int,
span_id: str | None,
worker_for: dict[NDIndex, str],
plugin: ShuffleWorkerPlugin,
) -> ShuffleRun:
return ArrayShuffleRun(

Check warning on line 185 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L185

Added line #L185 was not covered by tests
worker_for=worker_for,
axis=self.axis,
id=self.id,
Expand All @@ -201,7 +204,7 @@
)


class ArrayShuffleRun(ShuffleRun[NDIndex, "np.ndarray"]):

Check warning on line 207 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L207

Added line #L207 was not covered by tests
"""State for a single active rechunk execution

This object is responsible for splitting, sending, receiving and combining
Expand Down Expand Up @@ -245,7 +248,7 @@
buffer.
"""

def __init__(

Check warning on line 251 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L251

Added line #L251 was not covered by tests
self,
worker_for: dict[NDIndex, str],
axis: int,
Expand All @@ -263,7 +266,7 @@
disk: bool,
loop: IOLoop,
):
super().__init__(

Check warning on line 269 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L269

Added line #L269 was not covered by tests
id=id,
run_id=run_id,
span_id=span_id,
Expand All @@ -278,95 +281,95 @@
disk=disk,
loop=loop,
)
self.axis = axis
partitions_of = defaultdict(list)
for part, addr in worker_for.items():
partitions_of[addr].append(part)
self.partitions_of = dict(partitions_of)
self.worker_for = worker_for

Check warning on line 289 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L284-L289

Added lines #L284 - L289 were not covered by tests

def _shard_partition( # type: ignore[override]

Check warning on line 291 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L291

Added line #L291 was not covered by tests
self,
data: np.ndarray,
partition_id: NDIndex,
chunk_indexer: tuple[np.ndarray, np.ndarray],
chunk_tuple: tuple[int, ...],
) -> dict[str, tuple[NDIndex, list[tuple[NDIndex, tuple[NDIndex, np.ndarray]]]]]:
from dask.array._shuffle import convert_key

Check warning on line 298 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L298

Added line #L298 was not covered by tests

out: dict[str, list[tuple[NDIndex, tuple[NDIndex, np.ndarray]]]] = defaultdict(

Check warning on line 300 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L300

Added line #L300 was not covered by tests
list
)
shards_size = 0
shards_count = 0

Check warning on line 304 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L303-L304

Added lines #L303 - L304 were not covered by tests

target_chunk_nrs, taker_boundary = np.unique(

Check warning on line 306 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L306

Added line #L306 was not covered by tests
chunk_indexer[1], return_index=True
)

for target_chunk in target_chunk_nrs:
ndslice = [

Check warning on line 311 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L310-L311

Added lines #L310 - L311 were not covered by tests
slice(None),
] * len(data.shape)
ndslice[self.axis] = chunk_indexer[0][chunk_indexer[1] == target_chunk]
shard = data[*ndslice]

Check warning on line 315 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L314-L315

Added lines #L314 - L315 were not covered by tests
# Don't wait until all shards have been transferred over the network
# before data can be released
if shard.base is not None:
shard = shard.copy()

Check warning on line 319 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L318-L319

Added lines #L318 - L319 were not covered by tests

shards_size += shard.nbytes
shards_count += 1
chunk_index = convert_key(chunk_tuple, target_chunk, self.axis)

Check warning on line 323 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L321-L323

Added lines #L321 - L323 were not covered by tests

out[self.worker_for[chunk_index]].append(

Check warning on line 325 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L325

Added line #L325 was not covered by tests
(chunk_index, (partition_id, shard))
)

context_meter.digest_metric("p2p-shards", shards_size, "bytes")
context_meter.digest_metric("p2p-shards", shards_count, "count")
return {k: (partition_id, v) for k, v in out.items()}

Check warning on line 331 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L329-L331

Added lines #L329 - L331 were not covered by tests

async def _receive(

Check warning on line 333 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L333

Added line #L333 was not covered by tests
self,
data: list[tuple[NDIndex, list[tuple[NDIndex, tuple[NDIndex, np.ndarray]]]]],
) -> None:
self.raise_if_closed()

Check warning on line 337 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L337

Added line #L337 was not covered by tests

# Repartition shards and filter out already received ones
shards = defaultdict(list)
for d in data:
id1, payload = d
if id1 in self.received:
continue
self.received.add(id1)
for id2, shard in payload:
shards[id2].append(shard)
self.total_recvd += sizeof(d)
del data
if not shards:
return

Check warning on line 351 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L340-L351

Added lines #L340 - L351 were not covered by tests

try:
await self._write_to_disk(shards)
except Exception as e:
self._exception = e
raise

Check warning on line 357 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L353-L357

Added lines #L353 - L357 were not covered by tests

def _get_output_partition(

Check warning on line 359 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L359

Added line #L359 was not covered by tests
self, partition_id: NDIndex, key: Key, **kwargs: Any
) -> np.ndarray:
# Quickly read metadata from disk.
# This is a bunch of seek()'s interleaved with short reads.
data = self._read_from_disk(partition_id)

Check warning on line 364 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L364

Added line #L364 was not covered by tests
# Copy the memory-mapped buffers from disk into memory.
# This is where we'll spend most time.
return _convert_chunk(data, self.axis)

Check warning on line 367 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L367

Added line #L367 was not covered by tests

def deserialize(self, buffer: Any) -> Any:
return buffer

Check warning on line 370 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L369-L370

Added lines #L369 - L370 were not covered by tests

def read(self, path: Path) -> tuple[list[list[tuple[NDIndex, np.ndarray]]], int]:

Check warning on line 372 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L372

Added line #L372 was not covered by tests
"""Open a memory-mapped file descriptor to disk, read all metadata, and unpickle
all arrays. This is a fast sequence of short reads interleaved with seeks.
Do not read in memory the actual data; the arrays' buffers will point to the
Expand All @@ -376,28 +379,28 @@
returned arrays are dereferenced, which will happen after the call to
concatenate3.
"""
with path.open(mode="r+b") as fh:
buffer = memoryview(mmap.mmap(fh.fileno(), 0))

Check warning on line 383 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L382-L383

Added lines #L382 - L383 were not covered by tests

# The file descriptor has *not* been closed!
shards = list(unpickle_bytestream(buffer))
return shards, buffer.nbytes

Check warning on line 387 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L386-L387

Added lines #L386 - L387 were not covered by tests

def _get_assigned_worker(self, id: NDIndex) -> str:
return self.worker_for[id]

Check warning on line 390 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L389-L390

Added lines #L389 - L390 were not covered by tests


def _convert_chunk(

Check warning on line 393 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L393

Added line #L393 was not covered by tests
shards: list[list[tuple[NDIndex, np.ndarray]]], axis: int
) -> np.ndarray:
import numpy as np

Check warning on line 396 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L396

Added line #L396 was not covered by tests

indexed: dict[NDIndex, np.ndarray] = {}
for sublist in shards:
for index, shard in sublist:
indexed[index] = shard

Check warning on line 401 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L398-L401

Added lines #L398 - L401 were not covered by tests

arrs = [indexed[i] for i in sorted(indexed.keys())]

Check warning on line 403 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L403

Added line #L403 was not covered by tests
# This may block for several seconds, as it physically reads the memory-mapped
# buffers from disk
return np.concatenate(arrs, axis=axis)

Check warning on line 406 in distributed/shuffle/_shuffle_array.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle_array.py#L406

Added line #L406 was not covered by tests
Loading