Skip to content

Shuffle p2p #8813

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Rename keys
  • Loading branch information
phofl committed Aug 2, 2024
commit 344215141a3448d04025e16b45f5a13ed6950162
5 changes: 2 additions & 3 deletions distributed/shuffle/_shuffle_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from collections.abc import Generator, Hashable, Sequence
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from itertools import count, product
from itertools import product
from pathlib import Path
from typing import Any, Callable

Expand Down Expand Up @@ -68,7 +68,6 @@ def _p2p_shuffle( # type: ignore[no-untyped-def]
chunk_lengths[axis] = len(np.unique(result[1, :]))

transfer_keys = []
suffixes = count()

for i, (start, stop) in enumerate(zip(chunk_boundaries[:-1], chunk_boundaries[1:])):
chunk_indexer = sorted_indexer[:, start:stop].copy()
Expand All @@ -79,7 +78,7 @@ def _p2p_shuffle( # type: ignore[no-untyped-def]
chunk_indexer[0, :] -= chunk_boundaries[i]

for chunk_tuple in chunk_tuples:
key = (transfer_group,) + (next(suffixes),)
key = (transfer_group,) + convert_key(chunk_tuple, i, axis)
transfer_keys.append(key)
dsk[key] = (
shuffle_transfer,
Expand Down
Loading