Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 39 additions & 28 deletions src/sshkeyboard/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from platform import system
from time import time
from types import SimpleNamespace
from typing import Any, Callable, Optional
from typing import Any, Callable, Generator, Optional, TextIO, Tuple

try:
from ._asyncio_run_backport_36 import run36
Expand Down Expand Up @@ -166,7 +166,6 @@ def listen_keyboard(
max_thread_pool_workers: Optional[int] = None,
sleep: float = 0.01,
) -> None:

"""Listen for keyboard events and fire `on_press` and `on_release` callback
functions

Expand Down Expand Up @@ -380,11 +379,14 @@ def press(key):
_should_run = False


def _is_python_36():
def _is_python_36() -> bool:
return sys.version_info.major == 3 and sys.version_info.minor == 6


def _check_callback_ok(function, name):
def _check_callback_ok(
function: Optional[Callable[[str], Any]],
name: str,
) -> None:
if function is not None:
assert callable(function), f"{name} must be None or callable"
assert _takes_at_least_one_param(
Expand All @@ -396,12 +398,12 @@ def _check_callback_ok(function, name):
)


def _takes_at_least_one_param(function):
def _takes_at_least_one_param(function: Callable[[str], Any]) -> bool:
sig = signature(function)
return len(sig.parameters.values()) >= 1


def _default_empty_params(function):
def _default_empty_params(function: Callable[[str], Any]) -> Tuple[str, ...]:
sig = signature(function)
return tuple(
param.name
Expand All @@ -413,20 +415,25 @@ def _default_empty_params(function):
)


def _max_one_param_without_default(function):
def _max_one_param_without_default(function: Callable[[str], Any]) -> bool:
default_empty_params = _default_empty_params(function)
return len(default_empty_params) <= 1


def _done(task):
if not task.cancelled() and task.exception() is not None:
def _done(task: asyncio.Task[Any] | concurrent.futures.Future[Any]) -> None:
if not task.cancelled():
ex = task.exception()
traceback.print_exception(type(ex), ex, ex.__traceback__)
global _should_run
_should_run = False
if ex is not None:
traceback.print_exception(type(ex), ex, ex.__traceback__)
global _should_run
_should_run = False


def _callback(cb_function, sequential, executor):
def _callback(
cb_function: Optional[Callable[[str], Any]],
sequential: bool,
executor: Optional[concurrent.futures.Executor],
):
async def _cb(key):
if cb_function is None:
return
Expand All @@ -440,7 +447,7 @@ async def _cb(key):
if asyncio.iscoroutinefunction(cb_function):
task = asyncio.create_task(cb_function(key))
task.add_done_callback(_done)
else:
elif executor is not None:
future = executor.submit(cb_function, key)
future.add_done_callback(_done)

Expand All @@ -450,7 +457,7 @@ async def _cb(key):
# Raw and _nonblocking inspiration from:
# http://ballingt.com/_nonblocking-stdin-in-python-3/
@contextmanager
def _raw(stream):
def _raw(stream: TextIO) -> Generator[None, None, None]:
# Not required on Windows
if _is_windows:
yield
Expand All @@ -465,7 +472,7 @@ def _raw(stream):


@contextmanager
def _nonblocking(stream):
def _nonblocking(stream: TextIO) -> Generator[None, None, None]:
# Not required on Windows
if _is_windows:
yield
Expand All @@ -480,14 +487,14 @@ def _nonblocking(stream):
fcntl.fcntl(fd, fcntl.F_SETFL, orig_fl)


def _read_char(debug):
def _read_char(debug: bool) -> Optional[str]:
if _is_windows:
return _read_char_win(debug)
else:
return _read_char_unix(debug)


def _read_char_win(debug):
def _read_char_win(debug: bool) -> Optional[str]:
# Return if nothing to read
if not msvcrt.kbhit():
return ""
Expand All @@ -512,7 +519,7 @@ def _read_char_win(debug):
return char


def _read_char_unix(debug):
def _read_char_unix(debug: bool) -> Optional[str]:
char = _read_unix_stdin(1)

# Skip and continue if read failed
Expand All @@ -535,28 +542,32 @@ def _read_char_unix(debug):
return char


def _read_unix_stdin(amount):
def _read_unix_stdin(amount: int) -> Optional[str]:
try:
return sys.stdin.read(amount)
except IOError:
return None


# '\x' at the start is a good indicator for ansi character
def _is_unix_ansi(char):
def _is_unix_ansi(char: str) -> bool:
rep = repr(char)
return len(rep) >= 2 and rep[1] == "\\" and rep[2] == "x"


def _read_and_parse_unix_ansi(char):
char += _read_unix_stdin(5)
if char in _UNIX_ANSI_CHAR_TO_READABLE:
return _UNIX_ANSI_CHAR_TO_READABLE[char], char
else:
return None, char
def _read_and_parse_unix_ansi(char: str) -> Tuple[Optional[str], str]:
extra = _read_unix_stdin(5)
if extra is not None:
char += extra
if char in _UNIX_ANSI_CHAR_TO_READABLE:
return _UNIX_ANSI_CHAR_TO_READABLE[char], char
return None, char


async def _react_to_input(state, options):
async def _react_to_input(
state: SimpleNamespace,
options: SimpleNamespace,
) -> SimpleNamespace:
# Read next character
state.current = _read_char(options.debug)

Expand Down