Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def __init__( # noqa: PLR0913
default_authority: typing.Optional[str] = None,
channel_credentials: typing.Optional[grpc.ChannelCredentials] = None,
sync_metadata_disabled: typing.Optional[bool] = None,
fatal_status_codes: typing.Optional[list[str]] = None,
):
"""
Create an instance of the FlagdProvider
Expand Down Expand Up @@ -111,6 +112,7 @@ def __init__( # noqa: PLR0913
default_authority=default_authority,
channel_credentials=channel_credentials,
sync_metadata_disabled=sync_metadata_disabled,
fatal_status_codes=fatal_status_codes,
)
self.enriched_context: dict = {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
GeneralError,
InvalidContextError,
ParseError,
ProviderFatalError,
ProviderNotReadyError,
TypeMismatchError,
)
Expand Down Expand Up @@ -61,11 +62,13 @@ def __init__(
if self.config.cache == CacheType.LRU
else None
)
logger.debug(self.config.fatal_status_codes)

self.retry_grace_period = config.retry_grace_period
self.streamline_deadline_seconds = config.stream_deadline_ms * 0.001
self.deadline = config.deadline_ms * 0.001
self.connected = False
self._is_fatal = False
self.channel = self._generate_channel(config)
self.stub = evaluation_pb2_grpc.ServiceStub(self.channel)

Expand Down Expand Up @@ -151,9 +154,14 @@ def connect(self) -> None:
## block until ready or deadline reached
timeout = self.deadline + time.monotonic()
while not self.connected and time.monotonic() < timeout:
if self._is_fatal:
raise ProviderFatalError("Fatal gRPC status code received")
time.sleep(0.05)
logger.debug("Finished blocking gRPC state initialization")

if self._is_fatal:
raise ProviderFatalError("Fatal gRPC status code received")

if not self.connected:
raise ProviderNotReadyError(
"Blocking init finished before data synced. Consider increasing startup deadline to avoid inconsistent evaluations."
Expand All @@ -164,6 +172,8 @@ def monitor(self) -> None:

def _state_change_callback(self, new_state: ChannelConnectivity) -> None:
logger.debug(f"gRPC state change: {new_state}")
if self._is_fatal:
return
if (
new_state == grpc.ChannelConnectivity.READY
or new_state == grpc.ChannelConnectivity.IDLE
Expand Down Expand Up @@ -195,6 +205,8 @@ def _state_change_callback(self, new_state: ChannelConnectivity) -> None:
self.connected = False

def emit_error(self) -> None:
if self._is_fatal:
return
logger.debug("gRPC error emitted")
if self.cache:
self.cache.clear()
Expand Down Expand Up @@ -235,6 +247,16 @@ def listen(self) -> None:
except grpc.RpcError as e: # noqa: PERF203
# although it seems like this error log is not interesting, without it, the retry is not working as expected
logger.debug(f"SyncFlags stream error, {e.code()=} {e.details()=}")
if e.code().name in self.config.fatal_status_codes:
self._is_fatal = True
self.active = False
self.emit_provider_error(
ProviderEventDetails(
message=f"Fatal gRPC status code: {e.code()}",
error_code=ErrorCode.PROVIDER_FATAL,
)
)
return
except ParseError:
logger.exception(
f"Could not parse flag data using flagd syntax: {message=}"
Expand Down Expand Up @@ -399,8 +421,11 @@ def _resolve( # noqa: PLR0915 C901
except grpc.RpcError as e:
code = e.code()
message = f"received grpc status code {code}"
logger.debug(message)

if code == grpc.StatusCode.NOT_FOUND:
if code.name in self.config.fatal_status_codes:
raise ProviderFatalError(message) from e
elif code == grpc.StatusCode.NOT_FOUND:
raise FlagNotFoundError(message) from e
elif code == grpc.StatusCode.INVALID_ARGUMENT:
raise TypeMismatchError(message) from e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@

from openfeature.evaluation_context import EvaluationContext
from openfeature.event import ProviderEventDetails
from openfeature.exception import ErrorCode, ParseError, ProviderNotReadyError
from openfeature.exception import (
ErrorCode,
ParseError,
ProviderFatalError,
ProviderNotReadyError,
)
from openfeature.schemas.protobuf.flagd.sync.v1 import (
sync_pb2,
sync_pb2_grpc,
Expand Down Expand Up @@ -50,6 +55,7 @@ def __init__(
self.emit_provider_stale = emit_provider_stale

self.connected = False
self._is_fatal = False
self.thread: typing.Optional[threading.Thread] = None
self.timer: typing.Optional[threading.Timer] = None

Expand Down Expand Up @@ -132,9 +138,14 @@ def connect(self) -> None:
## block until ready or deadline reached
timeout = self.deadline + time.monotonic()
while not self.connected and time.monotonic() < timeout:
if self._is_fatal:
raise ProviderFatalError("Fatal gRPC status code received")
time.sleep(0.05)
logger.debug("Finished blocking gRPC state initialization")

if self._is_fatal:
raise ProviderFatalError("Fatal gRPC status code received")

if not self.connected:
raise ProviderNotReadyError(
"Blocking init finished before data synced. Consider increasing startup deadline to avoid inconsistent evaluations."
Expand All @@ -145,6 +156,8 @@ def monitor(self) -> None:

def _state_change_callback(self, new_state: grpc.ChannelConnectivity) -> None:
logger.debug(f"gRPC state change: {new_state}")
if self._is_fatal:
return
if (
new_state == grpc.ChannelConnectivity.READY
or new_state == grpc.ChannelConnectivity.IDLE
Expand Down Expand Up @@ -176,6 +189,8 @@ def _state_change_callback(self, new_state: grpc.ChannelConnectivity) -> None:
self.connected = False

def emit_error(self) -> None:
if self._is_fatal:
return
logger.debug("gRPC error emitted")
self.emit_provider_error(
ProviderEventDetails(
Expand Down Expand Up @@ -228,55 +243,69 @@ def _fetch_metadata(self) -> typing.Optional[sync_pb2.GetMetadataResponse]:
else:
raise e

def _handle_flag_response(
self,
flag_rsp: sync_pb2.SyncFlagsResponse,
context_values_response: typing.Optional[sync_pb2.GetMetadataResponse],
) -> bool:
"""Process a single flag response. Returns True if the loop should terminate."""
flag_str = flag_rsp.flag_configuration
logger.debug(f"Received flag configuration - {abs(hash(flag_str)) % (10**8)}")
self.flag_store.update(json.loads(flag_str))

if not self.connected:
context_values = {}
if flag_rsp.sync_context:
context_values = MessageToDict(flag_rsp.sync_context)
elif context_values_response:
context_values = MessageToDict(context_values_response)["metadata"]
self.emit_provider_ready(
ProviderEventDetails(message="gRPC sync connection established"),
context_values,
)
self.connected = True

if not self.active:
logger.debug("Terminating gRPC sync thread")
return True
return False

def _handle_rpc_error(self, e: grpc.RpcError) -> bool:
"""Handle a gRPC RpcError. Returns True if the stream loop should stop."""
logger.warning(f"SyncFlags stream error, {e.code()=} {e.details()=}")
if e.code().name in self.config.fatal_status_codes:
self._is_fatal = True
self.active = False
self.emit_provider_error(
ProviderEventDetails(
message=f"Fatal gRPC status code: {e.code()}",
error_code=ErrorCode.PROVIDER_FATAL,
)
)
return True
return False

def listen(self) -> None:
call_args = self.generate_grpc_call_args()

request_args = self._create_request_args()

while self.active:
try:
context_values_response = self._fetch_metadata()

request = sync_pb2.SyncFlagsRequest(**request_args)

logger.debug("Setting up gRPC sync flags connection")
for flag_rsp in self.stub.SyncFlags(request, **call_args):
flag_str = flag_rsp.flag_configuration
logger.debug(
f"Received flag configuration - {abs(hash(flag_str)) % (10**8)}"
)
self.flag_store.update(json.loads(flag_str))

context_values = {}
if flag_rsp.sync_context:
context_values = MessageToDict(flag_rsp.sync_context)
elif context_values_response:
context_values = MessageToDict(context_values_response)[
"metadata"
]

if not self.connected:
self.emit_provider_ready(
ProviderEventDetails(
message="gRPC sync connection established"
),
context_values,
)
self.connected = True

if not self.active:
logger.debug("Terminating gRPC sync thread")
if self._handle_flag_response(flag_rsp, context_values_response):
return
except grpc.RpcError as e: # noqa: PERF203
logger.debug(f"SyncFlags stream error, {e.code()=} {e.details()=}")
if self._handle_rpc_error(e):
return
except json.JSONDecodeError:
logger.exception(
f"Could not parse JSON flag data from SyncFlags endpoint: {flag_str=}"
"Could not parse JSON flag data from SyncFlags endpoint"
)
except ParseError:
logger.exception(
f"Could not parse flag data using flagd syntax: {flag_str=}"
)
logger.exception("Could not parse flag data using flagd syntax")

def generate_grpc_call_args(self) -> GrpcMultiCallableArgs:
call_args: GrpcMultiCallableArgs = {"wait_for_ready": True}
Expand Down
83 changes: 56 additions & 27 deletions providers/openfeature-provider-flagd/tests/e2e/flagd_container.py
Original file line number Diff line number Diff line change
@@ -1,69 +1,98 @@
import os.path
import logging
import os
import time
import typing
from pathlib import Path

import grpc
from grpc_health.v1 import health_pb2, health_pb2_grpc
from testcontainers.core.container import DockerContainer
from testcontainers.core.wait_strategies import LogMessageWaitStrategy
from testcontainers.compose import DockerCompose

from openfeature.contrib.provider.flagd.config import ResolverType

logger = logging.getLogger(__name__)

HEALTH_CHECK = 8014
LAUNCHPAD = 8080
FORBIDDEN = 9212


class FlagdContainer:
"""Manages the docker-compose environment for flagd e2e tests.

Uses docker-compose to start both flagd and envoy containers,
so the envoy forbidden endpoint (port 9212) returns a proper HTTP 403.
"""

class FlagdContainer(DockerContainer):
def __init__(
self,
feature: typing.Optional[str] = None,
**kwargs,
) -> None:
self._test_harness_dir = (
Path(__file__).parents[2] / "openfeature" / "test-harness"
)
self._version = (self._test_harness_dir / "version.txt").read_text().rstrip()

image: str = "ghcr.io/open-feature/flagd-testbed"
if feature is not None:
image = f"{image}-{feature}"
path = Path(__file__).parents[2] / "openfeature/test-harness/version.txt"
data = path.read_text().rstrip()
super().__init__(f"{image}:v{data}", **kwargs)
self.rpc = 8013
self.ipr = 8015

self.flagDir = Path("./flags")
self.flagDir.mkdir(parents=True, exist_ok=True)
self.with_exposed_ports(self.rpc, self.ipr, HEALTH_CHECK, LAUNCHPAD)
self.with_volume_mapping(os.path.abspath(self.flagDir.name), "/flags", "rw")
self.waiting_for(LogMessageWaitStrategy("listening").with_startup_timeout(5))

def get_port(self, resolver_type: ResolverType):
# Set environment variables for docker-compose substitution
os.environ["IMAGE"] = image
os.environ["VERSION"] = f"v{self._version}"
os.environ["FLAGS_DIR"] = str(self.flagDir.absolute())

self._compose = DockerCompose(
context=str(self._test_harness_dir),
compose_file_name="docker-compose.yaml",
wait=True,
)

def get_port(self, resolver_type: ResolverType) -> int:
if resolver_type == ResolverType.RPC:
return self.get_exposed_port(self.rpc)
return self._compose.get_service_port("flagd", 8013)
else:
return self.get_exposed_port(self.ipr)
return self._compose.get_service_port("flagd", 8015)

def get_launchpad_url(self):
return f"http://localhost:{self.get_exposed_port(LAUNCHPAD)}"
def get_exposed_port(self, port: int) -> int:
"""Get mapped port. For FORBIDDEN (9212) returns envoy port, otherwise flagd port."""
if port == FORBIDDEN:
return self._compose.get_service_port("envoy", FORBIDDEN)
return self._compose.get_service_port("flagd", port)

def get_launchpad_url(self) -> str:
port = self._compose.get_service_port("flagd", LAUNCHPAD)
return f"http://localhost:{port}"

def start(self) -> "FlagdContainer":
super().start()
self._checker(self.get_container_host_ip(), self.get_exposed_port(HEALTH_CHECK))
self._compose.start()
host = self._compose.get_service_host("flagd", HEALTH_CHECK) or "localhost"
port = self._compose.get_service_port("flagd", HEALTH_CHECK)
self._checker(host, port)
return self

def stop(self) -> None:
self._compose.stop()

def _checker(self, host: str, port: int) -> None:
# Give an extra second before continuing
time.sleep(1)
# Second we use the GRPC health check endpoint
with grpc.insecure_channel(host + ":" + str(port)) as channel:
# Use the GRPC health check endpoint
with grpc.insecure_channel(f"{host}:{port}") as channel:
health_stub = health_pb2_grpc.HealthStub(channel)

def health_check_call(stub: health_pb2_grpc.HealthStub):
request = health_pb2.HealthCheckRequest()
resp = stub.Check(request)
if resp.status == health_pb2.HealthCheckResponse.SERVING:
return True
elif resp.status == health_pb2.HealthCheckResponse.NOT_SERVING:
try:
resp = stub.Check(request)
return resp.status == health_pb2.HealthCheckResponse.SERVING
except Exception:
return False

# Should succeed
# Check health status every 1 second for 30 seconds
ok = False
for _ in range(30):
Expand All @@ -73,4 +102,4 @@ def health_check_call(stub: health_pb2_grpc.HealthStub):
time.sleep(1)

if not ok:
raise ConnectionError("flagD not ready in time")
raise ConnectionError("flagd not ready in time")
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from tests.e2e.testfilter import TestFilter

resolver = ResolverType.IN_PROCESS
feature_list = ["~targetURI", "~unixsocket", "~deprecated", "~forbidden"]
feature_list = ["~targetURI", "~unixsocket", "~deprecated"]


def pytest_collection_modifyitems(config, items):
Expand Down
Loading
Loading