Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Closes #xxxx

- [ ] Tests added / passed
- [ ] Passes `pre-commit run --all-files`
- [ ] Passes `prek run --all-files`
7 changes: 2 additions & 5 deletions .github/workflows/ci-pre-commit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,8 @@ on:

jobs:
checks:
name: pre-commit hooks
name: prek hooks
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4.1.3
- uses: actions/setup-python@v6
with:
python-version: '3.12'
- uses: pre-commit/action@v3.0.1
- uses: j178/prek-action@v1.1.1
8 changes: 4 additions & 4 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.14.10
rev: v0.15.6
hooks:
- id: ruff-check
- repo: https://github.com/psf/black-pre-commit-mirror
rev: 25.12.0
rev: 26.3.1
hooks:
- id: black
- repo: https://github.com/codespell-project/codespell
rev: v2.4.1
rev: v2.4.2
hooks:
- id: codespell
additional_dependencies:
Expand Down Expand Up @@ -42,5 +42,5 @@ repos:
- git+https://github.com/dask/zict

# Increase this value to clear the cache on GitHub actions if nothing else in this file
# has changed. See also same variable in .github/workflows/test.yaml
# has changed. See also same variable in .github/workflows/tests.yaml
# CACHE_NUMBER: 1
2 changes: 1 addition & 1 deletion continuous_integration/environment-3.10.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ dependencies:
- msgpack-python
- netcdf4
- paramiko
- pre-commit
- prek
- prometheus_client
- psutil
- pyarrow
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/environment-3.11.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ dependencies:
- msgpack-python
- netcdf4
- paramiko
- pre-commit
- prek
- prometheus_client
- psutil
- pyarrow=16.0
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/environment-3.12.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ dependencies:
- msgpack-python
- netcdf4
- paramiko
- pre-commit
- prek
- prometheus_client
- psutil
- pyarrow
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/environment-3.13.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ dependencies:
- msgpack-python
- netcdf4
- paramiko
- pre-commit
- prek
- prometheus_client
- psutil
- pyarrow
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/environment-3.14.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ dependencies:
- msgpack-python
- netcdf4
- paramiko
- pre-commit
- prek
- prometheus_client
- psutil
- pyarrow
Expand Down
2 changes: 1 addition & 1 deletion distributed/cli/dask_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ def main( # type: ignore[no-untyped-def]

if contact_address:
# we only need this to verify it is getting parsed
(_, _) = get_address_host_port(contact_address, strict=True)
_, _ = get_address_host_port(contact_address, strict=True)
else:
# if contact address is not present we use the listen_address for contact
contact_address = listen_address
Expand Down
6 changes: 2 additions & 4 deletions distributed/comm/ucx.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,12 @@


def _raise_deprecated():
message = textwrap.dedent(
"""\
message = textwrap.dedent("""\
The 'ucx' protocol was removed from Distributed because UCX-Py has been deprecated.
To continue using protocol='ucx', please install 'distributed-ucxx' (conda-forge)
or 'distributed-ucxx-cu[12,13]' (PyPI, selecting 12 for CUDA version 12.*, and 13
for CUDA version 13.*).
"""
)
""")
warnings.warn(message, FutureWarning)
raise FutureWarning(message)

Expand Down
2 changes: 1 addition & 1 deletion distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ def close_gracefully(self, reason: str = "nanny-close-gracefully") -> None:
"Closing Nanny gracefully at %r. Reason: %s", self.address_safe, reason
)

async def close( # type:ignore[override]
async def close( # type: ignore[override]
self, timeout: float = 5, reason: str = "nanny-close"
) -> Literal["OK"]:
"""
Expand Down
2 changes: 1 addition & 1 deletion distributed/protocol/tests/test_cupy.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def test_serialize_cupy_from_rmm(size):
x_np = np.arange(size, dtype="u1")

x_np_desc = x_np.__array_interface__
(x_np_ptr, _) = x_np_desc["data"]
x_np_ptr, _ = x_np_desc["data"]
(x_np_size,) = x_np_desc["shape"]
x = rmm.DeviceBuffer(ptr=x_np_ptr, size=x_np_size)

Expand Down
2 changes: 1 addition & 1 deletion distributed/protocol/tests/test_numba.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def test_serialize_numba_from_rmm(size):
x_np = np.arange(size, dtype="u1")

x_np_desc = x_np.__array_interface__
(x_np_ptr, _) = x_np_desc["data"]
x_np_ptr, _ = x_np_desc["data"]
(x_np_size,) = x_np_desc["shape"]
x = rmm.DeviceBuffer(ptr=x_np_ptr, size=x_np_size)

Expand Down
6 changes: 2 additions & 4 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5045,17 +5045,15 @@ def _generate_taskstates(
"releasing all the data before resubmitting another "
"computation. More details and help can be found at "
"https://github.com/dask/dask/issues/9888. "
+ textwrap.dedent(
f"""
+ textwrap.dedent(f"""
Debugging information
---------------------
old task state: {ts.state}
old run_spec: {ts.run_spec!r}
new run_spec: {dsk[k]!r}
old dependencies: {deps_lhs}
new dependencies: {deps_rhs}
"""
)
""")
)
else:
logger.debug(
Expand Down
2 changes: 1 addition & 1 deletion distributed/shuffle/tests/test_rechunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ async def test_rechunk_expand2(c, s, *ws):
--------
dask.array.tests.test_rechunk.test_rechunk_expand2
"""
(a, b) = (3, 2)
a, b = (3, 2)
orig = np.random.default_rng().uniform(0, 1, a**b).reshape((a,) * b)
for off, off2 in product(range(1, a - 1), range(1, a - 1)):
old = ((a - off, off),) * b
Expand Down
12 changes: 4 additions & 8 deletions distributed/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,12 @@ def test_logging_extended():


def test_default_logging_does_not_override_basic_config():
code = textwrap.dedent(
"""\
code = textwrap.dedent("""\
import logging
logging.basicConfig()
import distributed
logging.getLogger("distributed").warning("hello")
"""
)
""")
proc = subprocess.run(
[sys.executable, "-c", code], check=True, capture_output=True, encoding="utf8"
)
Expand All @@ -216,15 +214,13 @@ def test_default_logging_does_not_override_basic_config():


def test_basic_config_does_not_override_default_logging():
code = textwrap.dedent(
"""\
code = textwrap.dedent("""\
import logging
import distributed

logging.basicConfig()
logging.getLogger("distributed").warning("hello")
"""
)
""")
proc = subprocess.run(
[sys.executable, "-c", code], check=True, capture_output=True, encoding="utf8"
)
Expand Down
4 changes: 1 addition & 3 deletions distributed/tests/test_diskutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,7 @@ def test_workspace_process_crash(tmp_path):
sys.stdout.flush()

time.sleep(100)
""" % dict(
base_dir=base_dir
)
""" % dict(base_dir=base_dir)

p = subprocess.Popen(
[sys.executable, "-c", code],
Expand Down
30 changes: 10 additions & 20 deletions distributed/tests/test_preload.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,10 @@ async def test_scheduler_startup_nanny(s):
@gen_test()
async def test_web_preload_worker():
port = open_port()
data = dedent(
f"""\
data = dedent(f"""\
import dask
dask.config.set(scheduler_address="tcp://127.0.0.1:{port}")
"""
).encode()
""").encode()
with mock.patch(
"urllib3.PoolManager.request",
**{"return_value.data": data},
Expand All @@ -252,33 +250,29 @@ async def test_web_preload_worker():
)
@gen_cluster(nthreads=[])
async def test_client_preload_text(s):
text = dedent(
"""\
text = dedent("""\
def dask_setup(client):
client.foo = "setup"


def dask_teardown(client):
client.foo = "teardown"
"""
)
""")
async with Client(address=s.address, asynchronous=True, preload=text) as c:
assert c.foo == "setup"
assert c.foo == "teardown"


@gen_cluster(nthreads=[])
async def test_client_preload_config(s):
text = dedent(
"""\
text = dedent("""\
def dask_setup(client):
client.foo = "setup"


def dask_teardown(client):
client.foo = "teardown"
"""
)
""")
with dask.config.set({"distributed.client.preload": [text]}):
async with Client(address=s.address, asynchronous=True) as c:
assert c.foo == "setup"
Expand All @@ -291,16 +285,14 @@ def dask_teardown(client):
)
@gen_cluster(nthreads=[])
async def test_client_preload_click(s):
text = dedent(
"""\
text = dedent("""\
import click

@click.command()
@click.argument("value")
def dask_setup(client, value):
client.foo = value
"""
)
""")
value = "setup"
async with Client(
address=s.address, asynchronous=True, preload=text, preload_argv=[[value]]
Expand Down Expand Up @@ -387,16 +379,14 @@ def dask_teardown(worker):

@gen_cluster(nthreads=[])
async def test_client_preload_config_click(s):
text = dedent(
"""\
text = dedent("""\
import click

@click.command()
@click.argument("value")
def dask_setup(client, value):
client.foo = value
"""
)
""")
value = "setup"
with dask.config.set(
{
Expand Down
17 changes: 5 additions & 12 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2396,7 +2396,7 @@ def qux(x):
@gen_cluster(client=True)
async def test_collect_versions(c, s, a, b):
cs = s.clients[c.id]
(w1, w2) = s.workers.values()
w1, w2 = s.workers.values()
assert cs.versions
assert w1.versions
assert w2.versions
Expand Down Expand Up @@ -3462,18 +3462,13 @@ def test_memorystate():
with pytest.warns(FutureWarning):
assert m.managed_in_memory == m.managed

assert (
repr(m)
== dedent(
"""
assert repr(m) == dedent("""
Process memory (RSS) : 100 B
- managed by Dask : 68 B
- unmanaged (old) : 15 B
- unmanaged (recent): 17 B
Spilled to disk : 12 B
"""
).lstrip()
)
""").lstrip()


def test_memorystate_sum():
Expand Down Expand Up @@ -3879,14 +3874,12 @@ async def test_rebalance_raises_missing_data3(c, s, a, b, explicit):
futures = await c.scatter(range(100), workers=[a.address])

if explicit:
pytest.xfail(
reason="""Freeing keys and gathering data is using different
pytest.xfail(reason="""Freeing keys and gathering data is using different
channels (stream vs explicit RPC). Therefore, the
partial-fail is very timing sensitive and subject to a race
condition. This test assumes that the data is freed before
the rebalance get_data requests come in but merely deleting
the futures is not sufficient to guarantee this"""
)
the futures is not sufficient to guarantee this""")
keys = [f.key for f in futures]
del futures
out = await s.rebalance(keys=keys)
Expand Down
12 changes: 4 additions & 8 deletions distributed/tests/test_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -818,8 +818,7 @@ def test_popen_write_during_terminate_deadlock():
[
sys.executable,
"-c",
textwrap.dedent(
"""
textwrap.dedent("""
import signal
import threading

Expand All @@ -833,8 +832,7 @@ def cb(signum, frame):
signal.signal(signal.SIGINT, cb)
print('ready', flush=True)
e.wait()
"""
),
"""),
],
capture_output=True,
) as proc:
Expand All @@ -849,8 +847,7 @@ def test_popen_timeout(capsys):
[
sys.executable,
"-c",
textwrap.dedent(
"""
textwrap.dedent("""
import signal
import sys
import time
Expand All @@ -862,8 +859,7 @@ def test_popen_timeout(capsys):
while True:
time.sleep(0.1)
print("slept", flush=True)
"""
),
"""),
],
capture_output=True,
terminate_timeout=1,
Expand Down
Loading
Loading