Module aws_lambda_powertools.utilities.feature_flags
Advanced feature flags utility
Expand source code
"""Advanced feature flags utility"""
from .appconfig import AppConfigStore
from .base import StoreProvider
from .exceptions import ConfigurationStoreError
from .feature_flags import FeatureFlags
from .schema import RuleAction, SchemaValidator
__all__ = [
"ConfigurationStoreError",
"FeatureFlags",
"RuleAction",
"SchemaValidator",
"AppConfigStore",
"StoreProvider",
]
Sub-modules
aws_lambda_powertools.utilities.feature_flags.appconfig
aws_lambda_powertools.utilities.feature_flags.base
aws_lambda_powertools.utilities.feature_flags.exceptions
aws_lambda_powertools.utilities.feature_flags.feature_flags
aws_lambda_powertools.utilities.feature_flags.schema
Classes
class AppConfigStore (environment: str, application: str, name: str, max_age: int = 5, sdk_config: Optional[botocore.config.Config] = None, envelope: Optional[str] = '', jmespath_options: Optional[Dict[~KT, ~VT]] = None, logger: Union[logging.Logger, Logger, None] = None)
-
Helper class that provides a standard way to create an ABC using inheritance.
This class fetches JSON schemas from AWS AppConfig
Parameters
environment
:str
- Appconfig environment, e.g. 'dev/test' etc.
application
:str
- AppConfig application name, e.g. 'powertools'
name
:str
- AppConfig configuration name e.g.
my_conf
max_age
:int
- cache expiration time in seconds, or how often to call AppConfig to fetch latest configuration
sdk_config
:Optional[Config]
- Botocore Config object to pass during client initialization
envelope
:Optional[str]
- JMESPath expression to pluck feature flags data from config
jmespath_options
:Optional[Dict]
- Alternative JMESPath options to be included when filtering expr
logger
:A logging object
- Used to log messages. If None is supplied, one will be created.
Expand source code
class AppConfigStore(StoreProvider): def __init__( self, environment: str, application: str, name: str, max_age: int = 5, sdk_config: Optional[Config] = None, envelope: Optional[str] = "", jmespath_options: Optional[Dict] = None, logger: Optional[Union[logging.Logger, Logger]] = None, ): """This class fetches JSON schemas from AWS AppConfig Parameters ---------- environment: str Appconfig environment, e.g. 'dev/test' etc. application: str AppConfig application name, e.g. 'powertools' name: str AppConfig configuration name e.g. `my_conf` max_age: int cache expiration time in seconds, or how often to call AppConfig to fetch latest configuration sdk_config: Optional[Config] Botocore Config object to pass during client initialization envelope : Optional[str] JMESPath expression to pluck feature flags data from config jmespath_options : Optional[Dict] Alternative JMESPath options to be included when filtering expr logger: A logging object Used to log messages. If None is supplied, one will be created. """ super().__init__() self.logger = logger or logging.getLogger(__name__) self.environment = environment self.application = application self.name = name self.cache_seconds = max_age self.config = sdk_config self.envelope = envelope self.jmespath_options = jmespath_options self._conf_store = AppConfigProvider(environment=environment, application=application, config=sdk_config) @property def get_raw_configuration(self) -> Dict[str, Any]: """Fetch feature schema configuration from AWS AppConfig""" try: # parse result conf as JSON, keep in cache for self.max_age seconds self.logger.debug( "Fetching configuration from the store", extra={"param_name": self.name, "max_age": self.cache_seconds} ) return cast( dict, self._conf_store.get( name=self.name, transform=TRANSFORM_TYPE, max_age=self.cache_seconds, ), ) except (GetParameterError, TransformParameterError) as exc: err_msg = traceback.format_exc() if "AccessDenied" in err_msg: raise StoreClientError(err_msg) from exc raise ConfigurationStoreError("Unable to get AWS AppConfig configuration file") from exc def get_configuration(self) -> Dict[str, Any]: """Fetch feature schema configuration from AWS AppConfig If envelope is set, it'll extract and return feature flags from configuration, otherwise it'll return the entire configuration fetched from AWS AppConfig. Raises ------ ConfigurationStoreError Any validation error or AppConfig error that can occur Returns ------- Dict[str, Any] parsed JSON dictionary """ config = self.get_raw_configuration if self.envelope: self.logger.debug("Envelope enabled; extracting data from config", extra={"envelope": self.envelope}) config = jmespath_utils.extract_data_from_envelope( data=config, envelope=self.envelope, jmespath_options=self.jmespath_options ) return config
Ancestors
- StoreProvider
- abc.ABC
Instance variables
var get_raw_configuration : Dict[str, Any]
-
Fetch feature schema configuration from AWS AppConfig
Expand source code
@property def get_raw_configuration(self) -> Dict[str, Any]: """Fetch feature schema configuration from AWS AppConfig""" try: # parse result conf as JSON, keep in cache for self.max_age seconds self.logger.debug( "Fetching configuration from the store", extra={"param_name": self.name, "max_age": self.cache_seconds} ) return cast( dict, self._conf_store.get( name=self.name, transform=TRANSFORM_TYPE, max_age=self.cache_seconds, ), ) except (GetParameterError, TransformParameterError) as exc: err_msg = traceback.format_exc() if "AccessDenied" in err_msg: raise StoreClientError(err_msg) from exc raise ConfigurationStoreError("Unable to get AWS AppConfig configuration file") from exc
Methods
def get_configuration(self) ‑> Dict[str, Any]
-
Fetch feature schema configuration from AWS AppConfig
If envelope is set, it'll extract and return feature flags from configuration, otherwise it'll return the entire configuration fetched from AWS AppConfig.
Raises
ConfigurationStoreError
- Any validation error or AppConfig error that can occur
Returns
Dict[str, Any]
- parsed JSON dictionary
Expand source code
def get_configuration(self) -> Dict[str, Any]: """Fetch feature schema configuration from AWS AppConfig If envelope is set, it'll extract and return feature flags from configuration, otherwise it'll return the entire configuration fetched from AWS AppConfig. Raises ------ ConfigurationStoreError Any validation error or AppConfig error that can occur Returns ------- Dict[str, Any] parsed JSON dictionary """ config = self.get_raw_configuration if self.envelope: self.logger.debug("Envelope enabled; extracting data from config", extra={"envelope": self.envelope}) config = jmespath_utils.extract_data_from_envelope( data=config, envelope=self.envelope, jmespath_options=self.jmespath_options ) return config
class ConfigurationStoreError (*args, **kwargs)
-
When a configuration store raises an exception on config retrieval or parsing
Expand source code
class ConfigurationStoreError(Exception): """When a configuration store raises an exception on config retrieval or parsing"""
Ancestors
- builtins.Exception
- builtins.BaseException
class FeatureFlags (store: StoreProvider, logger: Union[logging.Logger, Logger, None] = None)
-
Evaluates whether feature flags should be enabled based on a given context.
It uses the provided store to fetch feature flag rules before evaluating them.
Examples
from aws_lambda_powertools.utilities.feature_flags import FeatureFlags, AppConfigStore app_config = AppConfigStore( environment="test", application="powertools", name="test_conf_name", max_age=300, envelope="features" ) feature_flags: FeatureFlags = FeatureFlags(store=app_config)
Parameters
store
:StoreProvider
- Store to use to fetch feature flag schema configuration.
logger
:A logging object
- Used to log messages. If None is supplied, one will be created.
Expand source code
class FeatureFlags: def __init__(self, store: StoreProvider, logger: Optional[Union[logging.Logger, Logger]] = None): """Evaluates whether feature flags should be enabled based on a given context. It uses the provided store to fetch feature flag rules before evaluating them. Examples -------- ```python from aws_lambda_powertools.utilities.feature_flags import FeatureFlags, AppConfigStore app_config = AppConfigStore( environment="test", application="powertools", name="test_conf_name", max_age=300, envelope="features" ) feature_flags: FeatureFlags = FeatureFlags(store=app_config) ``` Parameters ---------- store: StoreProvider Store to use to fetch feature flag schema configuration. logger: A logging object Used to log messages. If None is supplied, one will be created. """ self.store = store self.logger = logger or logging.getLogger(__name__) def _match_by_action(self, action: str, condition_value: Any, context_value: Any) -> bool: if not context_value: return False mapping_by_action = { schema.RuleAction.EQUALS.value: lambda a, b: a == b, schema.RuleAction.NOT_EQUALS.value: lambda a, b: a != b, schema.RuleAction.KEY_GREATER_THAN_VALUE.value: lambda a, b: a > b, schema.RuleAction.KEY_GREATER_THAN_OR_EQUAL_VALUE.value: lambda a, b: a >= b, schema.RuleAction.KEY_LESS_THAN_VALUE.value: lambda a, b: a < b, schema.RuleAction.KEY_LESS_THAN_OR_EQUAL_VALUE.value: lambda a, b: a <= b, schema.RuleAction.STARTSWITH.value: lambda a, b: a.startswith(b), schema.RuleAction.ENDSWITH.value: lambda a, b: a.endswith(b), schema.RuleAction.IN.value: lambda a, b: a in b, schema.RuleAction.NOT_IN.value: lambda a, b: a not in b, schema.RuleAction.KEY_IN_VALUE.value: lambda a, b: a in b, schema.RuleAction.KEY_NOT_IN_VALUE.value: lambda a, b: a not in b, schema.RuleAction.VALUE_IN_KEY.value: lambda a, b: b in a, schema.RuleAction.VALUE_NOT_IN_KEY.value: lambda a, b: b not in a, } try: func = mapping_by_action.get(action, lambda a, b: False) return func(context_value, condition_value) except Exception as exc: self.logger.debug(f"caught exception while matching action: action={action}, exception={str(exc)}") return False def _evaluate_conditions( self, rule_name: str, feature_name: str, rule: Dict[str, Any], context: Dict[str, Any] ) -> bool: """Evaluates whether context matches conditions, return False otherwise""" rule_match_value = rule.get(schema.RULE_MATCH_VALUE) conditions = cast(List[Dict], rule.get(schema.CONDITIONS_KEY)) if not conditions: self.logger.debug( f"rule did not match, no conditions to match, rule_name={rule_name}, rule_value={rule_match_value}, " f"name={feature_name} " ) return False for condition in conditions: context_value = context.get(str(condition.get(schema.CONDITION_KEY))) cond_action = condition.get(schema.CONDITION_ACTION, "") cond_value = condition.get(schema.CONDITION_VALUE) if not self._match_by_action(action=cond_action, condition_value=cond_value, context_value=context_value): self.logger.debug( f"rule did not match action, rule_name={rule_name}, rule_value={rule_match_value}, " f"name={feature_name}, context_value={str(context_value)} " ) return False # context doesn't match condition self.logger.debug(f"rule matched, rule_name={rule_name}, rule_value={rule_match_value}, name={feature_name}") return True def _evaluate_rules( self, *, feature_name: str, context: Dict[str, Any], feat_default: Any, rules: Dict[str, Any], boolean_feature: bool, ) -> bool: """Evaluates whether context matches rules and conditions, otherwise return feature default""" for rule_name, rule in rules.items(): rule_match_value = rule.get(schema.RULE_MATCH_VALUE) # Context might contain PII data; do not log its value self.logger.debug( f"Evaluating rule matching, rule={rule_name}, feature={feature_name}, default={str(feat_default)}, boolean_feature={boolean_feature}" # noqa: E501 ) if self._evaluate_conditions(rule_name=rule_name, feature_name=feature_name, rule=rule, context=context): # Maintenance: Revisit before going GA. return bool(rule_match_value) if boolean_feature else rule_match_value # no rule matched, return default value of feature self.logger.debug( f"no rule matched, returning feature default, default={str(feat_default)}, name={feature_name}, boolean_feature={boolean_feature}" # noqa: E501 ) return feat_default def get_configuration(self) -> Dict: """Get validated feature flag schema from configured store. Largely used to aid testing, since it's called by `evaluate` and `get_enabled_features` methods. Raises ------ ConfigurationStoreError Any propagated error from store SchemaValidationError When schema doesn't conform with feature flag schema Returns ------ Dict[str, Dict] parsed JSON dictionary **Example** ```python { "premium_features": { "default": False, "rules": { "customer tier equals premium": { "when_match": True, "conditions": [ { "action": "EQUALS", "key": "tier", "value": "premium", } ], } }, }, "feature_two": { "default": False } } ``` """ # parse result conf as JSON, keep in cache for max age defined in store self.logger.debug(f"Fetching schema from registered store, store={self.store}") config: Dict = self.store.get_configuration() validator = schema.SchemaValidator(schema=config) validator.validate() return config def evaluate(self, *, name: str, context: Optional[Dict[str, Any]] = None, default: JSONType) -> JSONType: """Evaluate whether a feature flag should be enabled according to stored schema and input context **Logic when evaluating a feature flag** 1. Feature exists and a rule matches, returns when_match value 2. Feature exists but has either no rules or no match, return feature default value 3. Feature doesn't exist in stored schema, encountered an error when fetching -> return default value provided Parameters ---------- name: str feature name to evaluate context: Optional[Dict[str, Any]] Attributes that should be evaluated against the stored schema. for example: `{"tenant_id": "X", "username": "Y", "region": "Z"}` default: JSONType default value if feature flag doesn't exist in the schema, or there has been an error when fetching the configuration from the store Can be boolean or any JSON values for non-boolean features. Returns ------ JSONType whether feature should be enabled (bool flags) or JSON value when non-bool feature matches Raises ------ SchemaValidationError When schema doesn't conform with feature flag schema """ if context is None: context = {} try: features = self.get_configuration() except ConfigurationStoreError as err: self.logger.debug(f"Failed to fetch feature flags from store, returning default provided, reason={err}") return default feature = features.get(name) if feature is None: self.logger.debug(f"Feature not found; returning default provided, name={name}, default={default}") return default rules = feature.get(schema.RULES_KEY) feat_default = feature.get(schema.FEATURE_DEFAULT_VAL_KEY) # Maintenance: Revisit before going GA. We might to simplify customers on-boarding by not requiring it # for non-boolean flags. It'll need minor implementation changes, docs changes, and maybe refactor # get_enabled_features. We can minimize breaking change, despite Beta label, by having a new # method `get_matching_features` returning Dict[feature_name, feature_value] boolean_feature = feature.get( schema.FEATURE_DEFAULT_VAL_TYPE_KEY, True ) # backwards compatability ,assume feature flag if not rules: self.logger.debug( f"no rules found, returning feature default, name={name}, default={str(feat_default)}, boolean_feature={boolean_feature}" # noqa: E501 ) # Maintenance: Revisit before going GA. We might to simplify customers on-boarding by not requiring it # for non-boolean flags. return bool(feat_default) if boolean_feature else feat_default self.logger.debug( f"looking for rule match, name={name}, default={str(feat_default)}, boolean_feature={boolean_feature}" # noqa: E501 ) return self._evaluate_rules( feature_name=name, context=context, feat_default=feat_default, rules=rules, boolean_feature=boolean_feature ) def get_enabled_features(self, *, context: Optional[Dict[str, Any]] = None) -> List[str]: """Get all enabled feature flags while also taking into account context (when a feature has defined rules) Parameters ---------- context: Optional[Dict[str, Any]] dict of attributes that you would like to match the rules against, can be `{'tenant_id: 'X', 'username':' 'Y', 'region': 'Z'}` etc. Returns ---------- List[str] list of all feature names that either matches context or have True as default **Example** ```python ["premium_features", "my_feature_two", "always_true_feature"] ``` Raises ------ SchemaValidationError When schema doesn't conform with feature flag schema """ if context is None: context = {} features_enabled: List[str] = [] try: features: Dict[str, Any] = self.get_configuration() except ConfigurationStoreError as err: self.logger.debug(f"Failed to fetch feature flags from store, returning empty list, reason={err}") return features_enabled self.logger.debug("Evaluating all features") for name, feature in features.items(): rules = feature.get(schema.RULES_KEY, {}) feature_default_value = feature.get(schema.FEATURE_DEFAULT_VAL_KEY) boolean_feature = feature.get( schema.FEATURE_DEFAULT_VAL_TYPE_KEY, True ) # backwards compatability ,assume feature flag if feature_default_value and not rules: self.logger.debug(f"feature is enabled by default and has no defined rules, name={name}") features_enabled.append(name) elif self._evaluate_rules( feature_name=name, context=context, feat_default=feature_default_value, rules=rules, boolean_feature=boolean_feature, ): self.logger.debug(f"feature's calculated value is True, name={name}") features_enabled.append(name) return features_enabled
Methods
def evaluate(self, *, name: str, context: Optional[Dict[str, Any]] = None, default: Union[str, int, float, bool, None, Dict[str, Any], List[Any]]) ‑> Union[str, int, float, bool, None, Dict[str, Any], List[Any]]
-
Evaluate whether a feature flag should be enabled according to stored schema and input context
Logic when evaluating a feature flag
- Feature exists and a rule matches, returns when_match value
- Feature exists but has either no rules or no match, return feature default value
- Feature doesn't exist in stored schema, encountered an error when fetching -> return default value provided
Parameters
name
:str
- feature name to evaluate
context
:Optional[Dict[str, Any]]
-
Attributes that should be evaluated against the stored schema.
for example:
{"tenant_id": "X", "username": "Y", "region": "Z"}
default
:JSONType
- default value if feature flag doesn't exist in the schema, or there has been an error when fetching the configuration from the store Can be boolean or any JSON values for non-boolean features.
Returns
JSONType
- whether feature should be enabled (bool flags) or JSON value when non-bool feature matches
Raises
SchemaValidationError
- When schema doesn't conform with feature flag schema
Expand source code
def evaluate(self, *, name: str, context: Optional[Dict[str, Any]] = None, default: JSONType) -> JSONType: """Evaluate whether a feature flag should be enabled according to stored schema and input context **Logic when evaluating a feature flag** 1. Feature exists and a rule matches, returns when_match value 2. Feature exists but has either no rules or no match, return feature default value 3. Feature doesn't exist in stored schema, encountered an error when fetching -> return default value provided Parameters ---------- name: str feature name to evaluate context: Optional[Dict[str, Any]] Attributes that should be evaluated against the stored schema. for example: `{"tenant_id": "X", "username": "Y", "region": "Z"}` default: JSONType default value if feature flag doesn't exist in the schema, or there has been an error when fetching the configuration from the store Can be boolean or any JSON values for non-boolean features. Returns ------ JSONType whether feature should be enabled (bool flags) or JSON value when non-bool feature matches Raises ------ SchemaValidationError When schema doesn't conform with feature flag schema """ if context is None: context = {} try: features = self.get_configuration() except ConfigurationStoreError as err: self.logger.debug(f"Failed to fetch feature flags from store, returning default provided, reason={err}") return default feature = features.get(name) if feature is None: self.logger.debug(f"Feature not found; returning default provided, name={name}, default={default}") return default rules = feature.get(schema.RULES_KEY) feat_default = feature.get(schema.FEATURE_DEFAULT_VAL_KEY) # Maintenance: Revisit before going GA. We might to simplify customers on-boarding by not requiring it # for non-boolean flags. It'll need minor implementation changes, docs changes, and maybe refactor # get_enabled_features. We can minimize breaking change, despite Beta label, by having a new # method `get_matching_features` returning Dict[feature_name, feature_value] boolean_feature = feature.get( schema.FEATURE_DEFAULT_VAL_TYPE_KEY, True ) # backwards compatability ,assume feature flag if not rules: self.logger.debug( f"no rules found, returning feature default, name={name}, default={str(feat_default)}, boolean_feature={boolean_feature}" # noqa: E501 ) # Maintenance: Revisit before going GA. We might to simplify customers on-boarding by not requiring it # for non-boolean flags. return bool(feat_default) if boolean_feature else feat_default self.logger.debug( f"looking for rule match, name={name}, default={str(feat_default)}, boolean_feature={boolean_feature}" # noqa: E501 ) return self._evaluate_rules( feature_name=name, context=context, feat_default=feat_default, rules=rules, boolean_feature=boolean_feature )
def get_configuration(self) ‑> Dict[~KT, ~VT]
-
Get validated feature flag schema from configured store.
Largely used to aid testing, since it's called by
evaluate
andget_enabled_features
methods.Raises
ConfigurationStoreError
- Any propagated error from store
SchemaValidationError
- When schema doesn't conform with feature flag schema
Returns
Dict[str, Dict]
-
parsed JSON dictionary
Example
{ "premium_features": { "default": False, "rules": { "customer tier equals premium": { "when_match": True, "conditions": [ { "action": "EQUALS", "key": "tier", "value": "premium", } ], } }, }, "feature_two": { "default": False } }
Expand source code
def get_configuration(self) -> Dict: """Get validated feature flag schema from configured store. Largely used to aid testing, since it's called by `evaluate` and `get_enabled_features` methods. Raises ------ ConfigurationStoreError Any propagated error from store SchemaValidationError When schema doesn't conform with feature flag schema Returns ------ Dict[str, Dict] parsed JSON dictionary **Example** ```python { "premium_features": { "default": False, "rules": { "customer tier equals premium": { "when_match": True, "conditions": [ { "action": "EQUALS", "key": "tier", "value": "premium", } ], } }, }, "feature_two": { "default": False } } ``` """ # parse result conf as JSON, keep in cache for max age defined in store self.logger.debug(f"Fetching schema from registered store, store={self.store}") config: Dict = self.store.get_configuration() validator = schema.SchemaValidator(schema=config) validator.validate() return config
def get_enabled_features(self, *, context: Optional[Dict[str, Any]] = None) ‑> List[str]
-
Get all enabled feature flags while also taking into account context (when a feature has defined rules)
Parameters
context
:Optional[Dict[str, Any]]
- dict of attributes that you would like to match the rules
against, can be
{'tenant_id: 'X', 'username':' 'Y', 'region': 'Z'}
etc.
Returns
List[str]
-
list of all feature names that either matches context or have True as default
Example
["premium_features", "my_feature_two", "always_true_feature"]
Raises
SchemaValidationError
- When schema doesn't conform with feature flag schema
Expand source code
def get_enabled_features(self, *, context: Optional[Dict[str, Any]] = None) -> List[str]: """Get all enabled feature flags while also taking into account context (when a feature has defined rules) Parameters ---------- context: Optional[Dict[str, Any]] dict of attributes that you would like to match the rules against, can be `{'tenant_id: 'X', 'username':' 'Y', 'region': 'Z'}` etc. Returns ---------- List[str] list of all feature names that either matches context or have True as default **Example** ```python ["premium_features", "my_feature_two", "always_true_feature"] ``` Raises ------ SchemaValidationError When schema doesn't conform with feature flag schema """ if context is None: context = {} features_enabled: List[str] = [] try: features: Dict[str, Any] = self.get_configuration() except ConfigurationStoreError as err: self.logger.debug(f"Failed to fetch feature flags from store, returning empty list, reason={err}") return features_enabled self.logger.debug("Evaluating all features") for name, feature in features.items(): rules = feature.get(schema.RULES_KEY, {}) feature_default_value = feature.get(schema.FEATURE_DEFAULT_VAL_KEY) boolean_feature = feature.get( schema.FEATURE_DEFAULT_VAL_TYPE_KEY, True ) # backwards compatability ,assume feature flag if feature_default_value and not rules: self.logger.debug(f"feature is enabled by default and has no defined rules, name={name}") features_enabled.append(name) elif self._evaluate_rules( feature_name=name, context=context, feat_default=feature_default_value, rules=rules, boolean_feature=boolean_feature, ): self.logger.debug(f"feature's calculated value is True, name={name}") features_enabled.append(name) return features_enabled
class RuleAction (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
An enumeration.
Expand source code
class RuleAction(str, Enum): EQUALS = "EQUALS" NOT_EQUALS = "NOT_EQUALS" KEY_GREATER_THAN_VALUE = "KEY_GREATER_THAN_VALUE" KEY_GREATER_THAN_OR_EQUAL_VALUE = "KEY_GREATER_THAN_OR_EQUAL_VALUE" KEY_LESS_THAN_VALUE = "KEY_LESS_THAN_VALUE" KEY_LESS_THAN_OR_EQUAL_VALUE = "KEY_LESS_THAN_OR_EQUAL_VALUE" STARTSWITH = "STARTSWITH" ENDSWITH = "ENDSWITH" IN = "IN" NOT_IN = "NOT_IN" KEY_IN_VALUE = "KEY_IN_VALUE" KEY_NOT_IN_VALUE = "KEY_NOT_IN_VALUE" VALUE_IN_KEY = "VALUE_IN_KEY" VALUE_NOT_IN_KEY = "VALUE_NOT_IN_KEY"
Ancestors
- builtins.str
- enum.Enum
Class variables
var ENDSWITH
var EQUALS
var IN
var KEY_GREATER_THAN_OR_EQUAL_VALUE
var KEY_GREATER_THAN_VALUE
var KEY_IN_VALUE
var KEY_LESS_THAN_OR_EQUAL_VALUE
var KEY_LESS_THAN_VALUE
var KEY_NOT_IN_VALUE
var NOT_EQUALS
var NOT_IN
var STARTSWITH
var VALUE_IN_KEY
var VALUE_NOT_IN_KEY
class SchemaValidator (schema: Dict[str, Any], logger: Union[logging.Logger, Logger, None] = None)
-
Validates feature flag schema configuration
Raises
SchemaValidationError
- When schema doesn't conform with feature flag schema
Schema
Feature object
A dictionary containing default value and rules for matching. The value MUST be an object and MIGHT contain the following members:
- default:
Union[bool, JSONType]
. Defines default feature value. This MUST be present - boolean_type: bool. Defines whether feature has non-boolean value (
JSONType
). This MIGHT be present - rules:
Dict[str, Dict]
. Rules object. This MIGHT be present
JSONType
being any JSON primitive value:Union[str, int, float, bool, None, Dict[str, Any], List[Any]]
{ "my_feature": { "default": true, "rules": {} }, "my_non_boolean_feature": { "default": {"group": "read-only"}, "boolean_type": false, "rules": {} } }
Rules object
A dictionary with each rule and their conditions that a feature might have. The value MIGHT be present, and when defined it MUST contain the following members:
- when_match:
Union[bool, JSONType]
. Defines value to return when context matches conditions - conditions:
List[Dict]
. Conditions object. This MUST be present
{ "my_feature": { "default": true, "rules": { "tenant id equals 345345435": { "when_match": false, "conditions": [] } } }, "my_non_boolean_feature": { "default": {"group": "read-only"}, "boolean_type": false, "rules": { "tenant id equals 345345435": { "when_match": {"group": "admin"}, "conditions": [] } } } }
Conditions object
A list of dictionaries containing conditions for a given rule. The value MUST contain the following members:
-
action:
str
. Operation to perform to match a key and value. The value MUST be either EQUALS, STARTSWITH, ENDSWITH, KEY_IN_VALUE KEY_NOT_IN_VALUE VALUE_IN_KEY VALUE_NOT_IN_KEY -
key:
str
. Key in given context to perform operation - value:
Any
. Value in given context that should match action operation.
{ "my_feature": { "default": true, "rules": { "tenant id equals 345345435": { "when_match": false, "conditions": [ { "action": "EQUALS", "key": "tenant_id", "value": "345345435", } ] } } } }
Expand source code
class SchemaValidator(BaseValidator): """Validates feature flag schema configuration Raises ------ SchemaValidationError When schema doesn't conform with feature flag schema Schema ------ **Feature object** A dictionary containing default value and rules for matching. The value MUST be an object and MIGHT contain the following members: * **default**: `Union[bool, JSONType]`. Defines default feature value. This MUST be present * **boolean_type**: bool. Defines whether feature has non-boolean value (`JSONType`). This MIGHT be present * **rules**: `Dict[str, Dict]`. Rules object. This MIGHT be present `JSONType` being any JSON primitive value: `Union[str, int, float, bool, None, Dict[str, Any], List[Any]]` ```json { "my_feature": { "default": true, "rules": {} }, "my_non_boolean_feature": { "default": {"group": "read-only"}, "boolean_type": false, "rules": {} } } ``` **Rules object** A dictionary with each rule and their conditions that a feature might have. The value MIGHT be present, and when defined it MUST contain the following members: * **when_match**: `Union[bool, JSONType]`. Defines value to return when context matches conditions * **conditions**: `List[Dict]`. Conditions object. This MUST be present ```json { "my_feature": { "default": true, "rules": { "tenant id equals 345345435": { "when_match": false, "conditions": [] } } }, "my_non_boolean_feature": { "default": {"group": "read-only"}, "boolean_type": false, "rules": { "tenant id equals 345345435": { "when_match": {"group": "admin"}, "conditions": [] } } } } ``` **Conditions object** A list of dictionaries containing conditions for a given rule. The value MUST contain the following members: * **action**: `str`. Operation to perform to match a key and value. The value MUST be either EQUALS, STARTSWITH, ENDSWITH, KEY_IN_VALUE KEY_NOT_IN_VALUE VALUE_IN_KEY VALUE_NOT_IN_KEY * **key**: `str`. Key in given context to perform operation * **value**: `Any`. Value in given context that should match action operation. ```json { "my_feature": { "default": true, "rules": { "tenant id equals 345345435": { "when_match": false, "conditions": [ { "action": "EQUALS", "key": "tenant_id", "value": "345345435", } ] } } } } ``` """ def __init__(self, schema: Dict[str, Any], logger: Optional[Union[logging.Logger, Logger]] = None): self.schema = schema self.logger = logger or logging.getLogger(__name__) def validate(self) -> None: self.logger.debug("Validating schema") if not isinstance(self.schema, dict): raise SchemaValidationError(f"Features must be a dictionary, schema={str(self.schema)}") features = FeaturesValidator(schema=self.schema) features.validate()
Ancestors
- BaseValidator
- abc.ABC
Methods
def validate(self) ‑> None
-
Expand source code
def validate(self) -> None: self.logger.debug("Validating schema") if not isinstance(self.schema, dict): raise SchemaValidationError(f"Features must be a dictionary, schema={str(self.schema)}") features = FeaturesValidator(schema=self.schema) features.validate()
class StoreProvider
-
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class StoreProvider(ABC): @property @abstractmethod def get_raw_configuration(self) -> Dict[str, Any]: """Get configuration from any store and return the parsed JSON dictionary""" raise NotImplementedError() # pragma: no cover @abstractmethod def get_configuration(self) -> Dict[str, Any]: """Get configuration from any store and return the parsed JSON dictionary If envelope is set, it'll extract and return feature flags from configuration, otherwise it'll return the entire configuration fetched from the store. Raises ------ ConfigurationStoreError Any error that can occur during schema fetch or JSON parse Returns ------- Dict[str, Any] parsed JSON dictionary **Example** ```python { "premium_features": { "default": False, "rules": { "customer tier equals premium": { "when_match": True, "conditions": [ { "action": "EQUALS", "key": "tier", "value": "premium", } ], } }, }, "feature_two": { "default": False } } ``` """ raise NotImplementedError() # pragma: no cover
Ancestors
- abc.ABC
Subclasses
Instance variables
var get_raw_configuration : Dict[str, Any]
-
Get configuration from any store and return the parsed JSON dictionary
Expand source code
@property @abstractmethod def get_raw_configuration(self) -> Dict[str, Any]: """Get configuration from any store and return the parsed JSON dictionary""" raise NotImplementedError() # pragma: no cover
Methods
def get_configuration(self) ‑> Dict[str, Any]
-
Get configuration from any store and return the parsed JSON dictionary
If envelope is set, it'll extract and return feature flags from configuration, otherwise it'll return the entire configuration fetched from the store.
Raises
ConfigurationStoreError
- Any error that can occur during schema fetch or JSON parse
Returns
Dict[str, Any]
-
parsed JSON dictionary
Example
{ "premium_features": { "default": False, "rules": { "customer tier equals premium": { "when_match": True, "conditions": [ { "action": "EQUALS", "key": "tier", "value": "premium", } ], } }, }, "feature_two": { "default": False } }
Expand source code
@abstractmethod def get_configuration(self) -> Dict[str, Any]: """Get configuration from any store and return the parsed JSON dictionary If envelope is set, it'll extract and return feature flags from configuration, otherwise it'll return the entire configuration fetched from the store. Raises ------ ConfigurationStoreError Any error that can occur during schema fetch or JSON parse Returns ------- Dict[str, Any] parsed JSON dictionary **Example** ```python { "premium_features": { "default": False, "rules": { "customer tier equals premium": { "when_match": True, "conditions": [ { "action": "EQUALS", "key": "tier", "value": "premium", } ], } }, }, "feature_two": { "default": False } } ``` """ raise NotImplementedError() # pragma: no cover