Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion pytest_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,17 +139,51 @@ def host_from_connect_args(args):
return host_from_address(address)


def is_ipaddress(address: str):
"""
Determine if the address is a valid IPv4 address.
"""
try:
socket.inet_aton(address)
return True
except socket.error:
return False


def resolve_hostname(hostname):
try:
return socket.gethostbyname(hostname)
except socket.gaierror:
return None


def treat_allowed(allowed):
allowed_hosts = []
for allow in allowed:
allow = allow.strip()
if is_ipaddress(allow):
allowed_hosts.append(allow)

resolved = resolve_hostname(allow)
if resolved:
allowed_hosts.append(resolved)
return allowed_hosts


def socket_allow_hosts(allowed=None):
""" disable socket.socket.connect() to disable the Internet. useful in testing.
"""
if isinstance(allowed, str):
allowed = allowed.split(',')

if not isinstance(allowed, list):
return

allowed_hosts = treat_allowed(allowed)

def guarded_connect(inst, *args):
host = host_from_connect_args(args)
if host and host in allowed:
if host and host in allowed_hosts:
return _true_connect(inst, *args)
raise SocketConnectBlockedError(allowed, host)

Expand Down
8 changes: 8 additions & 0 deletions tests/test_restrict_hosts.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ def test_single_cli_arg_connect_enabled(assert_connect):
assert_connect(True, cli_arg=localhost)


def test_single_cli_arg_connect_enabled_hostname_resolved(assert_connect):
assert_connect(True, cli_arg="localhost")


def test_single_cli_arg_connect_enabled_hostname_unresolvable(assert_connect):
assert_connect(False, cli_arg="unresolvable")


def test_single_cli_arg_connect_unicode_enabled(assert_connect):
assert_connect(True, cli_arg=localhost, code_template=connect_unicode_code_template)

Expand Down