diff --git a/src/nethsec/conntrack/__init__.py b/src/nethsec/conntrack/__init__.py
index 9baabd68..8712432f 100644
--- a/src/nethsec/conntrack/__init__.py
+++ b/src/nethsec/conntrack/__init__.py
@@ -65,20 +65,28 @@ def __parse_connection_info(flow: Element) -> dict:
return result
-def list_connections():
+def list_connections(labels: list = None):
"""
List all network connections.
+ Args:
+ - labels: optional list of label strings to filter by. Only connections that have
+ ALL the specified labels are returned. If ``None`` or empty, all
+ connections are returned.
+
Returns:
- dict of applications and their connections.
+ list of connections.
"""
- result = subprocess.run(["conntrack", "-L", "-o", "labels,xml"], capture_output=True, text=True)
+ cmd = ["conntrack", "-L", "-o", "labels,xml"]
+ if labels:
+ cmd.extend(["-l", ",".join(labels)])
+ result = subprocess.run(cmd, capture_output=True, text=True)
root = ElementTree.fromstring(result.stdout)
- result = []
+ connections = []
for flow in root.findall('flow'):
- result.append(__parse_connection_info(flow))
+ connections.append(__parse_connection_info(flow))
- return result
+ return connections
def drop_connection(connection_id: str):
diff --git a/tests/test_conntrack.py b/tests/test_conntrack.py
index e9138f19..c1a5cf64 100644
--- a/tests/test_conntrack.py
+++ b/tests/test_conntrack.py
@@ -257,6 +257,86 @@
1915971940
+
+
+
+ 10.0.0.1
+ 10.0.0.2
+
+
+ 12345
+ 80
+
+
+ 5
+ 500
+
+
+
+
+ 10.0.0.2
+ 10.0.0.1
+
+
+ 80
+ 12345
+
+
+ 3
+ 300
+
+
+
+ ESTABLISHED
+ 120
+ 0
+
+ 1111111111
+
+
+
+
+
+
+
+
+
+ 10.0.0.3
+ 10.0.0.4
+
+
+ 5353
+ 53
+
+
+ 1
+ 60
+
+
+
+
+ 10.0.0.4
+ 10.0.0.3
+
+
+ 53
+ 5353
+
+
+ 1
+ 60
+
+
+
+ 30
+ 0
+
+ 2222222222
+
+
+
+
+
"""
@@ -289,7 +369,41 @@ def test_fetch_connection_list(mocker: MockFixture):
process_result.stdout = conntrack_response
mocker.patch('subprocess.run', return_value=process_result)
result = conntrack.list_connections()
- assert len(result) == 7
+ assert len(result) == 9
+
+
+def test_list_connections_label_filter(mocker: MockFixture):
+ process_result = mocker.stub('subprocess_return')
+ process_result.stdout = conntrack_response
+ subprocess_mock = mocker.patch('subprocess.run', return_value=process_result)
+
+ # single label – '-l' flag appended with the label value
+ conntrack.list_connections(labels=['web'])
+ subprocess_mock.assert_called_with(
+ ['conntrack', '-L', '-o', 'labels,xml', '-l', 'web'],
+ capture_output=True, text=True
+ )
+
+ # multiple labels joined by comma
+ conntrack.list_connections(labels=['web', 'trusted'])
+ subprocess_mock.assert_called_with(
+ ['conntrack', '-L', '-o', 'labels,xml', '-l', 'web,trusted'],
+ capture_output=True, text=True
+ )
+
+ # no labels – no '-l' flag
+ conntrack.list_connections()
+ subprocess_mock.assert_called_with(
+ ['conntrack', '-L', '-o', 'labels,xml'],
+ capture_output=True, text=True
+ )
+
+ # empty list – no '-l' flag
+ conntrack.list_connections(labels=[])
+ subprocess_mock.assert_called_with(
+ ['conntrack', '-L', '-o', 'labels,xml'],
+ capture_output=True, text=True
+ )
def test_drop_connection(mocker: MockFixture):