diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py index 5efa995c..aadf3318 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py @@ -65,6 +65,7 @@ def __init__( # noqa: PLR0913 default_authority: typing.Optional[str] = None, channel_credentials: typing.Optional[grpc.ChannelCredentials] = None, sync_metadata_disabled: typing.Optional[bool] = None, + fatal_status_codes: typing.Optional[list[str]] = None, ): """ Create an instance of the FlagdProvider @@ -111,6 +112,7 @@ def __init__( # noqa: PLR0913 default_authority=default_authority, channel_credentials=channel_credentials, sync_metadata_disabled=sync_metadata_disabled, + fatal_status_codes=fatal_status_codes, ) self.enriched_context: dict = {} diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py index e4341d48..472fe864 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py @@ -18,6 +18,7 @@ GeneralError, InvalidContextError, ParseError, + ProviderFatalError, ProviderNotReadyError, TypeMismatchError, ) @@ -61,11 +62,13 @@ def __init__( if self.config.cache == CacheType.LRU else None ) + logger.debug(self.config.fatal_status_codes) self.retry_grace_period = config.retry_grace_period self.streamline_deadline_seconds = config.stream_deadline_ms * 0.001 self.deadline = config.deadline_ms * 0.001 self.connected = False + self._is_fatal = False self.channel = self._generate_channel(config) self.stub = evaluation_pb2_grpc.ServiceStub(self.channel) @@ -151,9 +154,14 @@ def connect(self) -> None: ## block until ready or deadline reached timeout = self.deadline + time.monotonic() while not self.connected and time.monotonic() < timeout: + if self._is_fatal: + break time.sleep(0.05) logger.debug("Finished blocking gRPC state initialization") + if self._is_fatal: + raise ProviderFatalError("Fatal gRPC status code received") + if not self.connected: raise ProviderNotReadyError( "Blocking init finished before data synced. Consider increasing startup deadline to avoid inconsistent evaluations." @@ -164,6 +172,8 @@ def monitor(self) -> None: def _state_change_callback(self, new_state: ChannelConnectivity) -> None: logger.debug(f"gRPC state change: {new_state}") + if self._is_fatal: + return if ( new_state == grpc.ChannelConnectivity.READY or new_state == grpc.ChannelConnectivity.IDLE @@ -235,6 +245,16 @@ def listen(self) -> None: except grpc.RpcError as e: # noqa: PERF203 # although it seems like this error log is not interesting, without it, the retry is not working as expected logger.debug(f"SyncFlags stream error, {e.code()=} {e.details()=}") + if e.code().name in self.config.fatal_status_codes: + self._is_fatal = True + self.active = False + self.emit_provider_error( + ProviderEventDetails( + message=f"Fatal gRPC status code: {e.code()}", + error_code=ErrorCode.PROVIDER_FATAL, + ) + ) + return except ParseError: logger.exception( f"Could not parse flag data using flagd syntax: {message=}" @@ -399,8 +419,11 @@ def _resolve( # noqa: PLR0915 C901 except grpc.RpcError as e: code = e.code() message = f"received grpc status code {code}" + logger.debug(message) - if code == grpc.StatusCode.NOT_FOUND: + if code.name in self.config.fatal_status_codes: + raise ProviderFatalError(message) from e + elif code == grpc.StatusCode.NOT_FOUND: raise FlagNotFoundError(message) from e elif code == grpc.StatusCode.INVALID_ARGUMENT: raise TypeMismatchError(message) from e diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py index 4d9d27ed..c514af20 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py @@ -10,7 +10,12 @@ from openfeature.evaluation_context import EvaluationContext from openfeature.event import ProviderEventDetails -from openfeature.exception import ErrorCode, ParseError, ProviderNotReadyError +from openfeature.exception import ( + ErrorCode, + ParseError, + ProviderFatalError, + ProviderNotReadyError, +) from openfeature.schemas.protobuf.flagd.sync.v1 import ( sync_pb2, sync_pb2_grpc, @@ -50,6 +55,7 @@ def __init__( self.emit_provider_stale = emit_provider_stale self.connected = False + self._is_fatal = False self.thread: typing.Optional[threading.Thread] = None self.timer: typing.Optional[threading.Timer] = None @@ -132,9 +138,14 @@ def connect(self) -> None: ## block until ready or deadline reached timeout = self.deadline + time.monotonic() while not self.connected and time.monotonic() < timeout: + if self._is_fatal: + break time.sleep(0.05) logger.debug("Finished blocking gRPC state initialization") + if self._is_fatal: + raise ProviderFatalError("Fatal gRPC status code received") + if not self.connected: raise ProviderNotReadyError( "Blocking init finished before data synced. Consider increasing startup deadline to avoid inconsistent evaluations." @@ -145,6 +156,8 @@ def monitor(self) -> None: def _state_change_callback(self, new_state: grpc.ChannelConnectivity) -> None: logger.debug(f"gRPC state change: {new_state}") + if self._is_fatal: + return if ( new_state == grpc.ChannelConnectivity.READY or new_state == grpc.ChannelConnectivity.IDLE @@ -228,55 +241,69 @@ def _fetch_metadata(self) -> typing.Optional[sync_pb2.GetMetadataResponse]: else: raise e + def _handle_flag_response( + self, + flag_rsp: sync_pb2.SyncFlagsResponse, + context_values_response: typing.Optional[sync_pb2.GetMetadataResponse], + ) -> bool: + """Process a single flag response. Returns True if the loop should terminate.""" + flag_str = flag_rsp.flag_configuration + logger.debug(f"Received flag configuration - {abs(hash(flag_str)) % (10**8)}") + self.flag_store.update(json.loads(flag_str)) + + if not self.connected: + context_values = {} + if flag_rsp.sync_context: + context_values = MessageToDict(flag_rsp.sync_context) + elif context_values_response: + context_values = MessageToDict(context_values_response)["metadata"] + self.emit_provider_ready( + ProviderEventDetails(message="gRPC sync connection established"), + context_values, + ) + self.connected = True + + if not self.active: + logger.debug("Terminating gRPC sync thread") + return True + return False + + def _handle_rpc_error(self, e: grpc.RpcError) -> bool: + """Handle a gRPC RpcError. Returns True if the stream loop should stop.""" + logger.warning(f"SyncFlags stream error, {e.code()=} {e.details()=}") + if e.code().name in self.config.fatal_status_codes: + self._is_fatal = True + self.active = False + self.emit_provider_error( + ProviderEventDetails( + message=f"Fatal gRPC status code: {e.code()}", + error_code=ErrorCode.PROVIDER_FATAL, + ) + ) + return True + return False + def listen(self) -> None: call_args = self.generate_grpc_call_args() - request_args = self._create_request_args() while self.active: try: context_values_response = self._fetch_metadata() - request = sync_pb2.SyncFlagsRequest(**request_args) - logger.debug("Setting up gRPC sync flags connection") for flag_rsp in self.stub.SyncFlags(request, **call_args): - flag_str = flag_rsp.flag_configuration - logger.debug( - f"Received flag configuration - {abs(hash(flag_str)) % (10**8)}" - ) - self.flag_store.update(json.loads(flag_str)) - - context_values = {} - if flag_rsp.sync_context: - context_values = MessageToDict(flag_rsp.sync_context) - elif context_values_response: - context_values = MessageToDict(context_values_response)[ - "metadata" - ] - - if not self.connected: - self.emit_provider_ready( - ProviderEventDetails( - message="gRPC sync connection established" - ), - context_values, - ) - self.connected = True - - if not self.active: - logger.debug("Terminating gRPC sync thread") + if self._handle_flag_response(flag_rsp, context_values_response): return except grpc.RpcError as e: # noqa: PERF203 - logger.debug(f"SyncFlags stream error, {e.code()=} {e.details()=}") + if self._handle_rpc_error(e): + return except json.JSONDecodeError: logger.exception( - f"Could not parse JSON flag data from SyncFlags endpoint: {flag_str=}" + "Could not parse JSON flag data from SyncFlags endpoint" ) except ParseError: - logger.exception( - f"Could not parse flag data using flagd syntax: {flag_str=}" - ) + logger.exception("Could not parse flag data using flagd syntax") def generate_grpc_call_args(self) -> GrpcMultiCallableArgs: call_args: GrpcMultiCallableArgs = {"wait_for_ready": True} diff --git a/providers/openfeature-provider-flagd/tests/e2e/flagd_container.py b/providers/openfeature-provider-flagd/tests/e2e/flagd_container.py index bc0dbb28..7b98962b 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/flagd_container.py +++ b/providers/openfeature-provider-flagd/tests/e2e/flagd_container.py @@ -1,69 +1,98 @@ -import os.path +import logging +import os import time import typing from pathlib import Path import grpc from grpc_health.v1 import health_pb2, health_pb2_grpc -from testcontainers.core.container import DockerContainer -from testcontainers.core.wait_strategies import LogMessageWaitStrategy +from testcontainers.compose import DockerCompose from openfeature.contrib.provider.flagd.config import ResolverType +logger = logging.getLogger(__name__) + HEALTH_CHECK = 8014 LAUNCHPAD = 8080 +FORBIDDEN = 9212 + + +class FlagdContainer: + """Manages the docker-compose environment for flagd e2e tests. + Uses docker-compose to start both flagd and envoy containers, + so the envoy forbidden endpoint (port 9212) returns a proper HTTP 403. + """ -class FlagdContainer(DockerContainer): def __init__( self, feature: typing.Optional[str] = None, **kwargs, ) -> None: + self._test_harness_dir = ( + Path(__file__).parents[2] / "openfeature" / "test-harness" + ) + self._version = (self._test_harness_dir / "version.txt").read_text().rstrip() + image: str = "ghcr.io/open-feature/flagd-testbed" if feature is not None: image = f"{image}-{feature}" - path = Path(__file__).parents[2] / "openfeature/test-harness/version.txt" - data = path.read_text().rstrip() - super().__init__(f"{image}:v{data}", **kwargs) - self.rpc = 8013 - self.ipr = 8015 + self.flagDir = Path("./flags") self.flagDir.mkdir(parents=True, exist_ok=True) - self.with_exposed_ports(self.rpc, self.ipr, HEALTH_CHECK, LAUNCHPAD) - self.with_volume_mapping(os.path.abspath(self.flagDir.name), "/flags", "rw") - self.waiting_for(LogMessageWaitStrategy("listening").with_startup_timeout(5)) - def get_port(self, resolver_type: ResolverType): + # Set environment variables for docker-compose substitution + os.environ["IMAGE"] = image + os.environ["VERSION"] = f"v{self._version}" + os.environ["FLAGS_DIR"] = str(self.flagDir.absolute()) + + self._compose = DockerCompose( + context=str(self._test_harness_dir), + compose_file_name="docker-compose.yaml", + wait=True, + ) + + def get_port(self, resolver_type: ResolverType) -> int: if resolver_type == ResolverType.RPC: - return self.get_exposed_port(self.rpc) + return self._compose.get_service_port("flagd", 8013) else: - return self.get_exposed_port(self.ipr) + return self._compose.get_service_port("flagd", 8015) - def get_launchpad_url(self): - return f"http://localhost:{self.get_exposed_port(LAUNCHPAD)}" + def get_exposed_port(self, port: int) -> int: + """Get mapped port. For FORBIDDEN (9212) returns envoy port, otherwise flagd port.""" + if port == FORBIDDEN: + return self._compose.get_service_port("envoy", FORBIDDEN) + return self._compose.get_service_port("flagd", port) + + def get_launchpad_url(self) -> str: + port = self._compose.get_service_port("flagd", LAUNCHPAD) + return f"http://localhost:{port}" def start(self) -> "FlagdContainer": - super().start() - self._checker(self.get_container_host_ip(), self.get_exposed_port(HEALTH_CHECK)) + self._compose.start() + host = self._compose.get_service_host("flagd", HEALTH_CHECK) or "localhost" + port = self._compose.get_service_port("flagd", HEALTH_CHECK) + self._checker(host, port) return self + def stop(self) -> None: + self._compose.stop() + def _checker(self, host: str, port: int) -> None: # Give an extra second before continuing time.sleep(1) - # Second we use the GRPC health check endpoint - with grpc.insecure_channel(host + ":" + str(port)) as channel: + # Use the GRPC health check endpoint + with grpc.insecure_channel(f"{host}:{port}") as channel: health_stub = health_pb2_grpc.HealthStub(channel) def health_check_call(stub: health_pb2_grpc.HealthStub): request = health_pb2.HealthCheckRequest() - resp = stub.Check(request) - if resp.status == health_pb2.HealthCheckResponse.SERVING: - return True - elif resp.status == health_pb2.HealthCheckResponse.NOT_SERVING: + try: + resp = stub.Check(request) + return resp.status == health_pb2.HealthCheckResponse.SERVING + except Exception: return False - # Should succeed # Check health status every 1 second for 30 seconds ok = False for _ in range(30): @@ -73,4 +102,4 @@ def health_check_call(stub: health_pb2_grpc.HealthStub): time.sleep(1) if not ok: - raise ConnectionError("flagD not ready in time") + raise ConnectionError("flagd not ready in time") diff --git a/providers/openfeature-provider-flagd/tests/e2e/inprocess/conftest.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/conftest.py index d087b512..49811e10 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/inprocess/conftest.py +++ b/providers/openfeature-provider-flagd/tests/e2e/inprocess/conftest.py @@ -4,7 +4,7 @@ from tests.e2e.testfilter import TestFilter resolver = ResolverType.IN_PROCESS -feature_list = ["~targetURI", "~unixsocket", "~deprecated", "~forbidden"] +feature_list = ["~targetURI", "~unixsocket", "~deprecated"] def pytest_collection_modifyitems(config, items): diff --git a/providers/openfeature-provider-flagd/tests/e2e/rpc/conftest.py b/providers/openfeature-provider-flagd/tests/e2e/rpc/conftest.py index e16f2bdd..8804bb94 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/rpc/conftest.py +++ b/providers/openfeature-provider-flagd/tests/e2e/rpc/conftest.py @@ -10,7 +10,6 @@ "~sync", "~metadata", "~deprecated", - "~forbidden", ] diff --git a/providers/openfeature-provider-flagd/tests/e2e/step/config_steps.py b/providers/openfeature-provider-flagd/tests/e2e/step/config_steps.py index a3b378ce..7da4f8c3 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/step/config_steps.py +++ b/providers/openfeature-provider-flagd/tests/e2e/step/config_steps.py @@ -46,8 +46,8 @@ def option_values() -> dict: @given( - parsers.cfparse( - 'an option "{option}" of type "{type_info}" with value "{value}"', + parsers.re( + r'an option "(?P