Source code for auth.client

"""
Enhanced client library with connection pooling, retry logic, and circuit breaker
"""

import json
from typing import Any, Dict
from urllib.parse import urljoin

import requests
from requests.adapters import HTTPAdapter
from requests.exceptions import ConnectionError
from requests.packages.urllib3.util.retry import Retry

from auth.circuit_breaker import circuit_breaker


[docs] class RetryableHTTPAdapter(HTTPAdapter): """HTTP adapter with retry logic"""
[docs] def __init__( self, retries=3, backoff_factor=0.3, status_forcelist=(500, 502, 504), **kwargs ): self.retries = retries self.backoff_factor = backoff_factor self.status_forcelist = status_forcelist super().__init__(**kwargs)
[docs] def init_poolmanager(self, *args, **kwargs): kwargs["retries"] = Retry( total=self.retries, read=self.retries, connect=self.retries, backoff_factor=self.backoff_factor, status_forcelist=self.status_forcelist, ) return super().init_poolmanager(*args, **kwargs)
[docs] class EnhancedAuthClient: """Enhanced client with connection pooling, retry logic, and circuit breaker"""
[docs] def __init__( self, api_key: str, service_url: str, max_retries: int = 3, pool_connections: int = 10, pool_maxsize: int = 20, timeout: int = 30, circuit_breaker_enabled: bool = True, ): """ Initialize the enhanced client Args: api_key: The API key for authentication service_url: The base URL for the auth service max_retries: Number of times to retry failed requests pool_connections: Number of connection pools pool_maxsize: Max connections per pool timeout: Request timeout in seconds circuit_breaker_enabled: Whether circuit breaker is enabled """ self.api_key = api_key self.service_url = service_url self.timeout = timeout self.circuit_breaker_enabled = circuit_breaker_enabled # Create session with connection pooling self.session = requests.Session() # Configure retries retry_strategy = Retry( total=max_retries, status_forcelist=[429, 500, 502, 503, 504], method_whitelist=["HEAD", "GET", "OPTIONS", "POST", "PUT", "DELETE"], backoff_factor=0.3, ) # Use our custom adapter for more control adapter = RetryableHTTPAdapter( retries=max_retries, status_forcelist=[429, 500, 502, 503, 504], max_retries=retry_strategy, ) self.session.mount("http://", adapter) self.session.mount("https://", adapter) # Set default headers self.session.headers.update( { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", } ) # API endpoints self.endpoints = { "ping": "/ping", "membership": "/api/membership/{user}/{group}", "permission": "/api/permission/{group}/{name}", "has_permission": "/api/has_permission/{user}/{name}", "user_permissions": "/api/user_permissions/{user}", "role_permissions": "/api/role_permissions/{role}", "user_roles": "/api/user_roles/{user}", "role_members": "/api/members/{role}", "roles": "/api/roles", "which_roles_can": "/api/which_roles_can/{name}", "which_users_can": "/api/which_users_can/{name}", "role": "/api/role/{role}", "workflow_users": "/api/workflow/users/{workflow_name}", "workflow_permission": "/api/workflow/user/{user}/can_run/{workflow_name}", }
def _make_request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]: """ Make an HTTP request with circuit breaker, retry logic, and error handling """ url = urljoin(self.service_url, endpoint) # Prepare the request function for the circuit breaker def request_func(): try: response = self.session.request( method=method, url=url, timeout=self.timeout, **kwargs ) # Raise an exception for bad status codes response.raise_for_status() # Try to parse JSON response try: json_response: Dict[str, Any] = response.json() return json_response except json.JSONDecodeError: # If JSON parsing fails, return the text content text_response: Dict[str, Any] = {"result": response.text} return text_response except requests.exceptions.RequestException as e: # Convert to our expected exception type raise ConnectionError(f"Request failed: {str(e)}") from e if self.circuit_breaker_enabled: # Use circuit breaker to wrap the request try: cb_result: Dict[str, Any] = circuit_breaker("api_call")(request_func)() return cb_result except Exception as e: raise ConnectionError( f"Circuit breaker prevented request: {str(e)}" ) from e else: direct_result: Dict[str, Any] = request_func() return direct_result
[docs] def ping(self) -> Dict[str, Any]: """Health check""" try: return self._make_request("GET", self.endpoints["ping"]) except Exception as e: return {"error": str(e), "success": False}
[docs] def add_membership(self, user: str, group: str) -> Dict[str, Any]: """Add user to a group""" endpoint = self.endpoints["membership"].format(user=user, group=group) try: return self._make_request("POST", endpoint) except Exception as e: return { "error": str(e), "success": False, "data": {"user": user, "group": group}, }
[docs] def remove_membership(self, user: str, group: str) -> Dict[str, Any]: """Remove user from a group""" endpoint = self.endpoints["membership"].format(user=user, group=group) try: return self._make_request("DELETE", endpoint) except Exception as e: return { "error": str(e), "success": False, "data": {"user": user, "group": group}, }
[docs] def has_membership(self, user: str, group: str) -> Dict[str, Any]: """Check if user is member of a group""" endpoint = self.endpoints["membership"].format(user=user, group=group) try: return self._make_request("GET", endpoint) except Exception as e: return { "error": str(e), "success": False, "data": {"user": user, "group": group}, }
[docs] def add_permission(self, group: str, name: str) -> Dict[str, Any]: """Add permission to a group""" endpoint = self.endpoints["permission"].format(group=group, name=name) try: return self._make_request("POST", endpoint) except Exception as e: return { "error": str(e), "success": False, "data": {"group": group, "name": name}, }
[docs] def remove_permission(self, group: str, name: str) -> Dict[str, Any]: """Remove permission from a group""" endpoint = self.endpoints["permission"].format(group=group, name=name) try: return self._make_request("DELETE", endpoint) except Exception as e: return { "error": str(e), "success": False, "data": {"group": group, "name": name}, }
[docs] def has_permission(self, group: str, name: str) -> Dict[str, Any]: """Check if group has permission""" endpoint = self.endpoints["permission"].format(group=group, name=name) try: return self._make_request("GET", endpoint) except Exception as e: return { "error": str(e), "success": False, "data": {"group": group, "name": name}, }
[docs] def user_has_permission(self, user: str, name: str) -> Dict[str, Any]: """Check if user has permission""" endpoint = self.endpoints["has_permission"].format(user=user, name=name) try: return self._make_request("GET", endpoint) except Exception as e: return { "error": str(e), "success": False, "data": {"user": user, "name": name}, }
[docs] def get_user_permissions(self, user: str) -> Dict[str, Any]: """Get all permissions for a user""" endpoint = self.endpoints["user_permissions"].format(user=user) try: return self._make_request("GET", endpoint) except Exception as e: return {"error": str(e), "success": False, "data": {"user": user}}
[docs] def get_role_permissions(self, role: str) -> Dict[str, Any]: """Get all permissions for a role""" endpoint = self.endpoints["role_permissions"].format(role=role) try: return self._make_request("GET", endpoint) except Exception as e: return {"error": str(e), "success": False, "data": {"role": role}}
[docs] def get_user_roles(self, user: str) -> Dict[str, Any]: """Get all roles for a user""" endpoint = self.endpoints["user_roles"].format(user=user) try: return self._make_request("GET", endpoint) except Exception as e: return {"error": str(e), "success": False, "data": {"user": user}}
[docs] def get_role_members(self, role: str) -> Dict[str, Any]: """Get all members of a role""" endpoint = self.endpoints["role_members"].format(role=role) try: return self._make_request("GET", endpoint) except Exception as e: return {"error": str(e), "success": False, "data": {"role": role}}
[docs] def list_roles(self) -> Dict[str, Any]: """List all roles""" try: return self._make_request("GET", self.endpoints["roles"]) except Exception as e: return {"error": str(e), "success": False}
[docs] def which_roles_can(self, name: str) -> Dict[str, Any]: """Get roles that can perform an action""" endpoint = self.endpoints["which_roles_can"].format(name=name) try: return self._make_request("GET", endpoint) except Exception as e: return {"error": str(e), "success": False, "data": {"name": name}}
[docs] def which_users_can(self, name: str) -> Dict[str, Any]: """Get users that can perform an action""" endpoint = self.endpoints["which_users_can"].format(name=name) try: return self._make_request("GET", endpoint) except Exception as e: return {"error": str(e), "success": False, "data": {"name": name}}
[docs] def create_role(self, role: str) -> Dict[str, Any]: """Create a new role""" endpoint = self.endpoints["role"].format(role=role) try: return self._make_request("POST", endpoint) except Exception as e: return {"error": str(e), "success": False, "data": {"role": role}}
[docs] def delete_role(self, role: str) -> Dict[str, Any]: """Delete a role""" endpoint = self.endpoints["role"].format(role=role) try: return self._make_request("DELETE", endpoint) except Exception as e: return {"error": str(e), "success": False, "data": {"role": role}}
# Workflow-related methods
[docs] def get_users_for_workflow(self, workflow_name: str) -> Dict[str, Any]: """Get all users who can run a specific workflow""" endpoint = self.endpoints["workflow_users"].format(workflow_name=workflow_name) try: return self._make_request("GET", endpoint) except Exception as e: return { "error": str(e), "success": False, "data": {"workflow_name": workflow_name}, }
[docs] def check_user_workflow_permission( self, user: str, workflow_name: str ) -> Dict[str, Any]: """Check if a user can run a specific workflow""" endpoint = self.endpoints["workflow_permission"].format( user=user, workflow_name=workflow_name ) try: return self._make_request("GET", endpoint) except Exception as e: return { "error": str(e), "success": False, "data": {"user": user, "workflow_name": workflow_name}, }
[docs] def close(self): """Close the session""" if self.session: self.session.close()
def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close()
# For backward compatibility with the old client
[docs] class Client(EnhancedAuthClient): """Legacy client class for backward compatibility""" pass