diff --git a/docs/src/components/Layout.jsx b/docs/src/components/Layout.jsx index efcabcd..9649853 100644 --- a/docs/src/components/Layout.jsx +++ b/docs/src/components/Layout.jsx @@ -18,6 +18,7 @@ const navigation = [ {title: 'Getting started', href: '/'}, {title: 'Validating HTTP Controls', href: '/docs/http'}, {title: 'Validating DNS Controls', href: '/docs/dns'}, + {title: 'Validating TCP Connectivity', href: '/docs/tcp'}, {title: 'Validating K8s Data', href: '/docs/k8s_data'}, ], }, diff --git a/docs/src/pages/docs/tcp.md b/docs/src/pages/docs/tcp.md new file mode 100644 index 0000000..522b091 --- /dev/null +++ b/docs/src/pages/docs/tcp.md @@ -0,0 +1,132 @@ +--- +title: TCP NetworkAssertions +description: TCP NetworkAssertions allow you to test raw TCP connectivity from your cluster. +--- + +This example shows how to write and run a `NetworkAssertion` that checks TCP connectivity +from within a namespace. TCP probes verify that a connection can (or cannot) be established +to a given host and port — the correct primitive for testing connectivity to non-HTTP services +such as databases, caches, and message brokers. + +## TCP NetworkAssertion + +We create a `NetworkAssertion` to verify that a TCP connection to the Kubernetes API +on port 443 succeeds, and that connections to a non-existent service are blocked: + +```yaml +apiVersion: netchecks.io/v1 +kind: NetworkAssertion +metadata: + name: tcp-connectivity + namespace: default + annotations: + description: Assert TCP connectivity to expected services +spec: + schedule: "*/10 * * * *" + rules: + - name: tcp-to-k8s-api + type: tcp + host: kubernetes.default.svc + port: 443 + expected: pass + validate: + message: TCP connection to Kubernetes API should succeed. + - name: tcp-to-blocked-port + type: tcp + host: kubernetes.default.svc + port: 9999 + timeout: 3 + expected: fail + validate: + message: TCP connection to non-listening port should fail. +``` + +### Parameters + +| Parameter | Description | Default | +| --- | --- | --- | +| `host` | Hostname or IP address to connect to | (required) | +| `port` | TCP port number | (required) | +| `timeout` | Connection timeout in seconds | `5` | +| `expected` | Whether the check should `pass` or `fail` | `pass` | + +## Boundary Protection Example + +TCP probes are ideal for verifying network segmentation and boundary protection. +For example, asserting that a web tier cannot directly reach a database: + +```yaml +apiVersion: netchecks.io/v1 +kind: NetworkAssertion +metadata: + name: boundary-protection + namespace: production + annotations: + description: Verify network segmentation between tiers +spec: + schedule: "@hourly" + rules: + - name: api-reachable + type: tcp + host: api.backend + port: 8080 + expected: pass + validate: + message: Web tier should reach the API tier. + - name: database-blocked + type: tcp + host: postgres.database + port: 5432 + expected: fail + validate: + message: Web tier must not directly access database tier. +``` + +## Custom Validation Rules + +You can write custom CEL validation rules to inspect the probe result data: + +```yaml + - name: tcp-with-custom-rule + type: tcp + host: my-service.default.svc + port: 8080 + validate: + pattern: "data.connected == true && data.error == null" + message: TCP connection should succeed with no errors. +``` + +The `data` object contains: +- `connected` (bool) — whether the TCP connection was established +- `error` (string or null) — error message if the connection failed +- `startTimestamp` — ISO 8601 timestamp when the check began +- `endTimestamp` — ISO 8601 timestamp when the check completed + +## Policy Report + +After the `NetworkAssertion` has been applied, a `PolicyReport` will be created with the +results. An example `PolicyReport` for a TCP check: + +```yaml +apiVersion: wgpolicyk8s.io/v1alpha2 +kind: PolicyReport +metadata: + name: tcp-connectivity + namespace: default +results: + - category: tcp + message: Rule from tcp-to-k8s-api + policy: tcp-to-k8s-api + properties: + data: >- + {"startTimestamp": "2024-01-15T10:30:00.123456", + "connected": true, "error": null, + "endTimestamp": "2024-01-15T10:30:00.234567"} + spec: >- + {"type": "tcp", "host": "kubernetes.default.svc", + "port": 443, "timeout": 5} + result: pass + source: netcheck +summary: + pass: 1 +``` diff --git a/netcheck/checks/tcp.py b/netcheck/checks/tcp.py new file mode 100644 index 0000000..0c49d9d --- /dev/null +++ b/netcheck/checks/tcp.py @@ -0,0 +1,44 @@ +import datetime +import logging +import socket + +logger = logging.getLogger("netcheck.tcp") +DEFAULT_TCP_VALIDATION_RULE = """ +data.connected == true +""" + + +def tcp_check(host: str, port: int, timeout: float = 5) -> dict: + test_spec = { + "type": "tcp", + "host": host, + "port": port, + "timeout": timeout, + } + + result_data = { + "startTimestamp": datetime.datetime.utcnow().isoformat(), + } + + output = {"spec": test_spec, "data": result_data} + + try: + with socket.create_connection((host, port), timeout=timeout): + result_data["connected"] = True + result_data["error"] = None + except socket.timeout: + logger.debug(f"TCP connection to {host}:{port} timed out") + result_data["connected"] = False + result_data["error"] = f"Connection timed out after {timeout}s" + except ConnectionRefusedError: + logger.debug(f"TCP connection to {host}:{port} refused") + result_data["connected"] = False + result_data["error"] = f"Connection refused to {host}:{port}" + except OSError as e: + logger.debug(f"TCP connection to {host}:{port} failed: {e}") + result_data["connected"] = False + result_data["error"] = str(e) + + result_data["endTimestamp"] = datetime.datetime.utcnow().isoformat() + + return output diff --git a/netcheck/cli.py b/netcheck/cli.py index 5be7bf7..c64c59d 100644 --- a/netcheck/cli.py +++ b/netcheck/cli.py @@ -8,6 +8,7 @@ from typing import List, Optional from netcheck.checks.dns import DEFAULT_DNS_VALIDATION_RULE +from netcheck.checks.tcp import DEFAULT_TCP_VALIDATION_RULE from netcheck.checks.http import NetcheckHttpMethod from netcheck.runner import run_from_config, check_individual_assertion from netcheck.version import NETCHECK_VERSION @@ -28,6 +29,7 @@ class NetcheckOutputType(str, Enum): class NetcheckTestType(str, Enum): dns = "dns" http = "http" + tcp = "tcp" internal = "internal" @@ -180,6 +182,45 @@ def dns( output_result(result, should_fail, verbose) +@app.command() +def tcp( + host: str = typer.Option("github.com", help="Host to connect to", rich_help_panel="tcp test"), + port: int = typer.Option(443, help="Port to connect to", rich_help_panel="tcp test"), + timeout: float = typer.Option(5.0, "-t", "--timeout", help="Timeout in seconds"), + should_fail: bool = typer.Option(False, "--should-fail/--should-pass"), + validation_rule: str = typer.Option(None, "--validation-rule", help="Validation rule in CEL to apply to result"), + verbose: bool = typer.Option(False, "-v", "--verbose"), +): + """Carry out a tcp connectivity check""" + + test_config = { + "host": host, + "port": port, + "timeout": timeout, + "expected": "fail" if should_fail else None, + } + if verbose: + err_console.print("netcheck tcp") + err_console.print("Options") + err_console.print_json(data=test_config) + + if validation_rule is None: + validation_rule = DEFAULT_TCP_VALIDATION_RULE + else: + err_console.print("Validating result against custom validation rule") + + result = check_individual_assertion( + NetcheckTestType.tcp, + test_config, + err_console, + validation_rule=validation_rule, + verbose=verbose, + include_context=True, + ) + + output_result(result, should_fail, verbose) + + def notify_for_unexpected_test_result(failed, should_fail, verbose=False): if verbose: if failed: diff --git a/netcheck/runner.py b/netcheck/runner.py index a947986..d97898b 100644 --- a/netcheck/runner.py +++ b/netcheck/runner.py @@ -11,6 +11,7 @@ from netcheck.checks.internal import internal_check from netcheck.checks.dns import dns_lookup_check, DEFAULT_DNS_VALIDATION_RULE from netcheck.checks.http import http_request_check, DEFAULT_HTTP_VALIDATION_RULE +from netcheck.checks.tcp import tcp_check, DEFAULT_TCP_VALIDATION_RULE from netcheck.context import replace_template, LazyFileLoadingDict logger = logging.getLogger("netcheck.runner") @@ -108,6 +109,14 @@ def check_individual_assertion( timeout=test_config.get("timeout"), verify=test_config.get("verify-tls-cert", True), ) + case "tcp": + if verbose: + err_console.print(f"TCP check connecting to {test_config['host']}:{test_config['port']}") + test_detail = tcp_check( + host=test_config["host"], + port=int(test_config["port"]), + timeout=test_config.get("timeout", 5), + ) case "internal": if verbose: err_console.print(f"Internal check with command '{test_config['command']}'") @@ -125,6 +134,8 @@ def check_individual_assertion( validation_rule = DEFAULT_HTTP_VALIDATION_RULE case "dns": validation_rule = DEFAULT_DNS_VALIDATION_RULE + case "tcp": + validation_rule = DEFAULT_TCP_VALIDATION_RULE case "internal": validation_rule = "true" case _: diff --git a/operator/examples/cilium-tcp-egress-restrictions/tcp-egress-assertion.yaml b/operator/examples/cilium-tcp-egress-restrictions/tcp-egress-assertion.yaml new file mode 100644 index 0000000..7229d12 --- /dev/null +++ b/operator/examples/cilium-tcp-egress-restrictions/tcp-egress-assertion.yaml @@ -0,0 +1,24 @@ +apiVersion: netchecks.io/v1 +kind: NetworkAssertion +metadata: + name: tcp-egress-restrictions-should-work + annotations: + description: Verify TCP egress restrictions are enforced by Cilium network policy +spec: + schedule: "*/1 * * * *" + rules: + - name: tcp-to-k8s-api-allowed + type: tcp + host: kubernetes.default.svc + port: 443 + expected: pass + validate: + message: TCP to Kubernetes API port 443 should be allowed by policy. + - name: tcp-to-blocked-port + type: tcp + host: kubernetes.default.svc + port: 8080 + timeout: 5 + expected: fail + validate: + message: TCP to port 8080 should be blocked by Cilium egress policy. diff --git a/operator/examples/cilium-tcp-egress-restrictions/tcp-egress-netpol.yaml b/operator/examples/cilium-tcp-egress-restrictions/tcp-egress-netpol.yaml new file mode 100644 index 0000000..e28cf1a --- /dev/null +++ b/operator/examples/cilium-tcp-egress-restrictions/tcp-egress-netpol.yaml @@ -0,0 +1,28 @@ +apiVersion: "cilium.io/v2" +kind: CiliumNetworkPolicy +metadata: + name: restrict-tcp-egress +spec: + endpointSelector: {} + egress: + # Allow DNS to kube-dns (required for name resolution) + - toEndpoints: + - matchLabels: + "k8s:io.kubernetes.pod.namespace": kube-system + "k8s:k8s-app": kube-dns + toPorts: + - ports: + - port: "53" + protocol: ANY + rules: + dns: + - matchPattern: "*" + # Allow TCP to the Kubernetes API (port 443) + - toServices: + - k8sService: + serviceName: kubernetes + namespace: default + toPorts: + - ports: + - port: "443" + protocol: TCP diff --git a/operator/examples/compliance/soc2-boundary-protection.yaml b/operator/examples/compliance/soc2-boundary-protection.yaml new file mode 100644 index 0000000..98dec16 --- /dev/null +++ b/operator/examples/compliance/soc2-boundary-protection.yaml @@ -0,0 +1,30 @@ +apiVersion: netchecks.io/v1 +kind: NetworkAssertion +metadata: + name: boundary-protection-web-tier + namespace: production + annotations: + netchecks.io/controls: "soc2/CC6.6, soc2/CC6.7" + netchecks.io/description: > + Verify web tier boundary protection is effective. Web pods + should reach the API tier but not the database tier directly. + SOC 2 CC6.6 requires boundary protection mechanisms to be + in place and monitored. + netchecks.io/severity: high +spec: + schedule: "@hourly" + rules: + - name: web-to-api-allowed + type: tcp + host: api.backend + port: 8080 + expected: pass + validate: + message: Web tier should reach the API tier + - name: web-to-db-blocked + type: tcp + host: postgres.database + port: 5432 + expected: fail + validate: + message: Web tier must not directly access database tier diff --git a/operator/examples/default-k8s/tcp.yaml b/operator/examples/default-k8s/tcp.yaml new file mode 100644 index 0000000..dc022f8 --- /dev/null +++ b/operator/examples/default-k8s/tcp.yaml @@ -0,0 +1,25 @@ +apiVersion: netchecks.io/v1 +kind: NetworkAssertion +metadata: + name: tcp-should-work + namespace: default + annotations: + description: Assert pod can establish TCP connections to expected services. +spec: + schedule: "*/10 * * * *" + rules: + - name: tcp-to-k8s-api + type: tcp + host: kubernetes.default.svc + port: 443 + expected: pass + validate: + message: TCP connection to Kubernetes API should succeed. + - name: tcp-to-blocked-port + type: tcp + host: kubernetes.default.svc + port: 9999 + timeout: 3 + expected: fail + validate: + message: TCP connection to non-listening port should fail. diff --git a/operator/tests/test_tcp_cilium_controls.py b/operator/tests/test_tcp_cilium_controls.py new file mode 100644 index 0000000..46ca59a --- /dev/null +++ b/operator/tests/test_tcp_cilium_controls.py @@ -0,0 +1,119 @@ +import json +import os +import time +import subprocess + +import pytest + + +INCLUDE_CILIUM_TESTS = os.getenv("INCLUDE_CILIUM_TESTS") + + +@pytest.mark.skipif(INCLUDE_CILIUM_TESTS is None, reason="Cilium is not installed") +def test_tcp_egress_with_cilium_policy(netchecks, k8s_namespace, example_dir_path): + tcp_restrictions_dir = example_dir_path("cilium-tcp-egress-restrictions") + + # Apply the Cilium network policy first and let it propagate + subprocess.run( + f"kubectl apply -n {k8s_namespace} -f {tcp_restrictions_dir}/tcp-egress-netpol.yaml", + shell=True, + check=True, + ) + time.sleep(5) + + # Apply the NetworkAssertion + subprocess.run( + f"kubectl apply -n {k8s_namespace} -f {tcp_restrictions_dir}/tcp-egress-assertion.yaml", + shell=True, + check=True, + ) + + # Wait for the CronJob to be created + for i in range(10): + cronjobs_response = subprocess.run( + f"kubectl get cronjobs -n {k8s_namespace}", + shell=True, + check=True, + capture_output=True, + ) + if b"tcp-egress-restrictions-should-work" in cronjobs_response.stdout: + break + time.sleep(1.5**i) + + # Wait for a Job to be created from the CronJob + for i in range(10): + jobs_response = subprocess.run( + f"kubectl get jobs -n {k8s_namespace} -l app.kubernetes.io/instance=tcp-egress-restrictions-should-work", + shell=True, + check=True, + capture_output=True, + ) + if b"tcp-egress-restrictions-should-work" in jobs_response.stdout: + break + time.sleep(1.5**i) + + # Wait for the job to complete + subprocess.run( + f"kubectl wait Job -l app.kubernetes.io/instance=tcp-egress-restrictions-should-work -n {k8s_namespace} --for condition=complete --timeout=120s", + shell=True, + check=True, + ) + + # Wait for the PolicyReport to appear + for i in range(20): + policy_report_response = subprocess.run( + f"kubectl get policyreports -n {k8s_namespace}", + shell=True, + check=True, + capture_output=True, + ) + if b"tcp-egress-restrictions-should-work" in policy_report_response.stdout: + break + time.sleep(1.3**i) + assert b"tcp-egress-restrictions-should-work" in policy_report_response.stdout + + # Validate the summary: both rules should pass (one expected pass, one expected fail) + summary_filter = "jsonpath='{.summary}'" + policy_report_summary = subprocess.run( + f"""kubectl get policyreport/tcp-egress-restrictions-should-work -n {k8s_namespace} -o {summary_filter}""", + shell=True, + check=True, + capture_output=True, + ) + policy_report_summary = json.loads(policy_report_summary.stdout) + assert policy_report_summary["pass"] >= 2 + + # Validate the detailed results + results_filter = "jsonpath='{.results}'" + policy_report_results_response = subprocess.run( + f"""kubectl get policyreport/tcp-egress-restrictions-should-work -n {k8s_namespace} -o {results_filter}""", + shell=True, + check=True, + capture_output=True, + ) + policy_report_results = json.loads(policy_report_results_response.stdout) + assert len(policy_report_results) == 2 + + for result in policy_report_results: + assert result["category"] == "tcp" + assert result["result"] == "pass", str(result) + assert result["source"] == "netchecks" + + test_spec = json.loads(result["properties"]["spec"]) + assert test_spec["type"] == "tcp" + + test_data = json.loads(result["properties"]["data"]) + if result["policy"] == "tcp-to-k8s-api-allowed": + # Allowed by policy - connection should succeed + assert test_data["connected"] is True + elif result["policy"] == "tcp-to-blocked-port": + # Blocked by Cilium policy - connection should fail + assert test_data["connected"] is False + assert test_data["error"] is not None + + # Clean up + subprocess.run( + f"kubectl delete -n {k8s_namespace} -f {tcp_restrictions_dir} --timeout=30s", + shell=True, + check=True, + ) diff --git a/operator/tests/test_tcp_default_cluster.py b/operator/tests/test_tcp_default_cluster.py new file mode 100644 index 0000000..5f90196 --- /dev/null +++ b/operator/tests/test_tcp_default_cluster.py @@ -0,0 +1,88 @@ +import json +import time +import subprocess + + +def test_tcp_connectivity_with_installed_operator(netchecks, k8s_namespace, test_file_path): + tcp_assertion_manifest = test_file_path("tcp-job.yaml") + + subprocess.run( + f"kubectl apply -n {k8s_namespace} -f {tcp_assertion_manifest}", + shell=True, + check=True, + ) + + # Assert that a Job gets created in the same namespace + for i in range(10): + jobs_response = subprocess.run( + f"kubectl get jobs -n {k8s_namespace}", + shell=True, + check=True, + capture_output=True, + ) + if b"tcp-should-work" in jobs_response.stdout: + break + time.sleep(1.5**i) + + # Wait for the job to complete + subprocess.run( + f"kubectl wait Job/tcp-should-work -n {k8s_namespace} --for condition=complete --timeout=120s", + shell=True, + check=True, + ) + + # Assert that a PolicyReport gets created in the same namespace + for i in range(10): + policy_report_response = subprocess.run( + f"kubectl get policyreports -n {k8s_namespace}", + shell=True, + check=True, + capture_output=True, + ) + if b"tcp-should-work" in policy_report_response.stdout: + break + time.sleep(1.5**i) + assert b"tcp-should-work" in policy_report_response.stdout + + summary_filter = "jsonpath='{.summary}'" + policy_report_summary = subprocess.run( + f"""kubectl get policyreport/tcp-should-work -n {k8s_namespace} -o {summary_filter}""", + shell=True, + check=True, + capture_output=True, + ) + policy_report_summary = json.loads(policy_report_summary.stdout) + + assert policy_report_summary["pass"] >= 2 + + # Now get the detailed policy report results + results_filter = "jsonpath='{.results}'" + policy_report_results_response = subprocess.run( + f"""kubectl get policyreport/tcp-should-work -n {k8s_namespace} -o {results_filter}""", + shell=True, + check=True, + capture_output=True, + ) + policy_report_results = json.loads(policy_report_results_response.stdout) + + for result in policy_report_results: + assert result["category"] == "tcp" + assert result["result"] == "pass" + assert result["source"] == "netchecks" + + test_spec = json.loads(result["properties"]["spec"]) + assert test_spec["type"] == "tcp" + + test_data = json.loads(result["properties"]["data"]) + if result["policy"] == "tcp-to-k8s-api": + assert test_data["connected"] is True + elif result["policy"] == "tcp-to-nonexistent": + assert test_data["connected"] is False + + # Delete the network assertion + subprocess.run( + f"kubectl delete -n {k8s_namespace} -f {tcp_assertion_manifest} --timeout=30s", + shell=True, + check=True, + ) + time.sleep(3.0) diff --git a/operator/tests/testdata/tcp-job.yaml b/operator/tests/testdata/tcp-job.yaml new file mode 100644 index 0000000..c2be004 --- /dev/null +++ b/operator/tests/testdata/tcp-job.yaml @@ -0,0 +1,27 @@ +apiVersion: netchecks.io/v1 +kind: NetworkAssertion +metadata: + name: tcp-should-work + annotations: + description: Assert pod can establish TCP connections +spec: + template: + metadata: + labels: + optional-label: applied-to-test-pod + rules: + - name: tcp-to-k8s-api + type: tcp + host: kubernetes.default.svc + port: 443 + expected: pass + validate: + message: TCP connection to Kubernetes API should succeed. + - name: tcp-to-nonexistent + type: tcp + host: nonexistent-service.default.svc + port: 9999 + timeout: 3 + expected: fail + validate: + message: TCP connection to non-existent service should fail. diff --git a/tests/conftest.py b/tests/conftest.py index a761927..1447ebb 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -61,3 +61,8 @@ def valid_config_unexpected_fail_filename(): @fixture() def http_headers_config_filename(): return os.path.join(TEST_DATA_DIR, "http-with-headers.json") + + +@fixture() +def tcp_config_filename(): + return os.path.join(TEST_DATA_DIR, "tcp-config.json") diff --git a/tests/test_cli.py b/tests/test_cli.py index 28b61c6..c8115e5 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -316,3 +316,66 @@ def test_run_internal_config_default(internal_config_filename): assert expected_pass_result["status"] == "pass" assert expected_fail_result["status"] == "fail" + + +def test_default_tcp_check(): + result = runner.invoke(app, ["tcp"]) + assert result.exit_code == 0 + assert len(result.stderr) == 0 + data = json.loads(result.stdout) + assert data["status"] == "pass" + assert data["spec"]["type"] == "tcp" + assert data["data"]["connected"] is True + + +def test_verbose_tcp_check(): + result = runner.invoke(app, ["tcp", "-v"]) + assert result.exit_code == 0 + assert "TCP" in result.stderr + assert "Passed" in result.stderr + data = json.loads(result.stdout) + assert data["status"] == "pass" + + +def test_tcp_check_should_fail(): + result = runner.invoke(app, ["tcp", "--should-fail"]) + assert result.exit_code == 0 + data = json.loads(result.stdout) + assert data["status"] == "fail" + + +def test_tcp_check_connection_refused(): + result = runner.invoke(app, ["tcp", "--host", "localhost", "--port", "1"]) + assert result.exit_code == 0 + data = json.loads(result.stdout) + assert data["data"]["connected"] is False + assert data["data"]["error"] is not None + assert data["status"] == "fail" + + +def test_tcp_check_with_custom_validation(): + result = runner.invoke( + app, + [ + "tcp", + "--validation-rule", + "data.connected == true && data.error == null", + ], + ) + assert result.exit_code == 0 + data = json.loads(result.stdout) + assert data["status"] == "pass" + + +def test_run_tcp_config(tcp_config_filename): + result = runner.invoke(app, ["run", "--config", tcp_config_filename]) + assert result.exit_code == 0 + data = json.loads(result.stdout) + + tcp_pass = data["assertions"][0]["results"][0] + assert tcp_pass["status"] == "pass" + assert tcp_pass["spec"]["type"] == "tcp" + assert tcp_pass["data"]["connected"] is True + + tcp_fail = data["assertions"][1]["results"][0] + assert tcp_fail["status"] == "pass" # expected: fail + connection refused = pass diff --git a/tests/testdata/tcp-config.json b/tests/testdata/tcp-config.json new file mode 100644 index 0000000..54a16bf --- /dev/null +++ b/tests/testdata/tcp-config.json @@ -0,0 +1,10 @@ +{ + "assertions": [ + {"name": "tcp-pass", "rules": [ + {"type": "tcp", "host": "github.com", "port": 443} + ]}, + {"name": "tcp-expected-fail", "rules": [ + {"type": "tcp", "host": "localhost", "port": 1, "expected": "fail"} + ]} + ] +}