Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions src/nethsec/conntrack/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,28 @@ def __parse_connection_info(flow: Element) -> dict:
return result


def list_connections():
def list_connections(labels: list = None):
"""
List all network connections.

Args:
- labels: optional list of label strings to filter by. Only connections that have
ALL the specified labels are returned. If ``None`` or empty, all
connections are returned.

Returns:
dict of applications and their connections.
list of connections.
"""
result = subprocess.run(["conntrack", "-L", "-o", "labels,xml"], capture_output=True, text=True)
cmd = ["conntrack", "-L", "-o", "labels,xml"]
if labels:
cmd.extend(["-l", ",".join(labels)])
result = subprocess.run(cmd, capture_output=True, text=True)
root = ElementTree.fromstring(result.stdout)
result = []
connections = []
for flow in root.findall('flow'):
result.append(__parse_connection_info(flow))
connections.append(__parse_connection_info(flow))

return result
return connections


def drop_connection(connection_id: str):
Expand Down
116 changes: 115 additions & 1 deletion tests/test_conntrack.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,86 @@
<id>1915971940</id>
</meta>
</flow>
<flow>
<meta direction="original">
<layer3 protonum="2" protoname="ipv4">
<src>10.0.0.1</src>
<dst>10.0.0.2</dst>
</layer3>
<layer4 protonum="6" protoname="tcp">
<sport>12345</sport>
<dport>80</dport>
</layer4>
<counters>
<packets>5</packets>
<bytes>500</bytes>
</counters>
</meta>
<meta direction="reply">
<layer3 protonum="2" protoname="ipv4">
<src>10.0.0.2</src>
<dst>10.0.0.1</dst>
</layer3>
<layer4 protonum="6" protoname="tcp">
<sport>80</sport>
<dport>12345</dport>
</layer4>
<counters>
<packets>3</packets>
<bytes>300</bytes>
</counters>
</meta>
<meta direction="independent">
<state>ESTABLISHED</state>
<timeout>120</timeout>
<mark>0</mark>
<use>1</use>
<id>1111111111</id>
<labels>
<label>web</label>
<label>trusted</label>
</labels>
</meta>
</flow>
<flow>
<meta direction="original">
<layer3 protonum="2" protoname="ipv4">
<src>10.0.0.3</src>
<dst>10.0.0.4</dst>
</layer3>
<layer4 protonum="17" protoname="udp">
<sport>5353</sport>
<dport>53</dport>
</layer4>
<counters>
<packets>1</packets>
<bytes>60</bytes>
</counters>
</meta>
<meta direction="reply">
<layer3 protonum="2" protoname="ipv4">
<src>10.0.0.4</src>
<dst>10.0.0.3</dst>
</layer3>
<layer4 protonum="17" protoname="udp">
<sport>53</sport>
<dport>5353</dport>
</layer4>
<counters>
<packets>1</packets>
<bytes>60</bytes>
</counters>
</meta>
<meta direction="independent">
<timeout>30</timeout>
<mark>0</mark>
<use>1</use>
<id>2222222222</id>
<labels>
<label>dns</label>
</labels>
</meta>
</flow>
</conntrack>
"""

Expand Down Expand Up @@ -289,7 +369,41 @@ def test_fetch_connection_list(mocker: MockFixture):
process_result.stdout = conntrack_response
mocker.patch('subprocess.run', return_value=process_result)
result = conntrack.list_connections()
assert len(result) == 7
assert len(result) == 9


def test_list_connections_label_filter(mocker: MockFixture):
process_result = mocker.stub('subprocess_return')
process_result.stdout = conntrack_response
subprocess_mock = mocker.patch('subprocess.run', return_value=process_result)

# single label – '-l' flag appended with the label value
conntrack.list_connections(labels=['web'])
subprocess_mock.assert_called_with(
['conntrack', '-L', '-o', 'labels,xml', '-l', 'web'],
capture_output=True, text=True
)

# multiple labels joined by comma
conntrack.list_connections(labels=['web', 'trusted'])
subprocess_mock.assert_called_with(
['conntrack', '-L', '-o', 'labels,xml', '-l', 'web,trusted'],
capture_output=True, text=True
)

# no labels – no '-l' flag
conntrack.list_connections()
subprocess_mock.assert_called_with(
['conntrack', '-L', '-o', 'labels,xml'],
capture_output=True, text=True
)

# empty list – no '-l' flag
conntrack.list_connections(labels=[])
subprocess_mock.assert_called_with(
['conntrack', '-L', '-o', 'labels,xml'],
capture_output=True, text=True
)


def test_drop_connection(mocker: MockFixture):
Expand Down