RBAC & ABAC for AI Systems

Part of Module 5: Security & Compliance

Access control is fundamental to securing AI systems and agents. This module explores Role-Based Access Control (RBAC) and Attribute-Based Access Control (ABAC), comparing their strengths and implementation strategies for AI applications, LLM agents, and model governance.

Understanding RBAC for AI Systems

Role-Based Access Control Fundamentals

RBAC assigns permissions to roles rather than individual users, simplifying management in AI systems where different stakeholders need varying levels of access to models, data, and predictions.

Core Components:

  • Users: Data scientists, ML engineers, business analysts
  • Roles: Model Developer, Model Reviewer, Prediction Consumer
  • Permissions: Train models, deploy models, query predictions
  • Sessions: Active user-role assignments with context
# Python implementation using Casbin for RBAC
import casbin
from dataclasses import dataclass

@dataclass
class AISystemRBAC:
    """RBAC implementation for AI model access"""
    
    def __init__(self):
        self.enforcer = casbin.Enforcer(
            'rbac_model.conf',
            'rbac_policy.csv'
        )
    
    def check_model_access(self, user: str, model: str, action: str) -> bool:
        """Check if user can perform action on model"""
        return self.enforcer.enforce(user, model, action)
    
    def assign_role(self, user: str, role: str):
        """Assign role to user"""
        self.enforcer.add_role_for_user(user, role)
    
    def grant_permission(self, role: str, resource: str, action: str):
        """Grant permission to role"""
        self.enforcer.add_policy(role, resource, action)

# RBAC Model Configuration (rbac_model.conf)
rbac_model = """
[request_definition]
r = sub, obj, act

[policy_definition]
p = sub, obj, act

[role_definition]
g = _, _

[policy_effect]
e = some(where (p.eft == allow))

[matchers]
m = g(r.sub, p.sub) && r.obj == p.obj && r.act == p.act
"""

RBAC in Practice: ML Platform

Consider a machine learning platform with these roles:

  • Data Engineer: Can access raw data, create features
  • ML Developer: Can train models, run experiments
  • ML Reviewer: Can approve models for production
  • API Consumer: Can only query deployed models

Attribute-Based Access Control for AI

ABAC: Fine-Grained Control for AI Agents

ABAC uses attributes of users, resources, actions, and environment to make access decisions. This is particularly powerful for AI agents that need context-aware permissions.

Key Attributes in AI Systems:

  • User Attributes: Department, clearance level, expertise
  • Resource Attributes: Model sensitivity, data classification, accuracy threshold
  • Action Attributes: Purpose, risk level, audit requirement
  • Environmental: Time, location, threat level, compliance mode
# ABAC implementation using OPA (Open Policy Agent)
import requests
import json
from typing import Dict, Any

class AISystemABAC:
    """ABAC implementation using OPA for AI systems"""
    
    def __init__(self, opa_url: str = "http://localhost:8181"):
        self.opa_url = opa_url
    
    def check_access(self, 
                     user_attrs: Dict[str, Any],
                     resource_attrs: Dict[str, Any],
                     action: str,
                     environment: Dict[str, Any]) -> bool:
        """Evaluate ABAC policy for AI resource access"""
        
        input_data = {
            "input": {
                "user": user_attrs,
                "resource": resource_attrs,
                "action": action,
                "environment": environment
            }
        }
        
        response = requests.post(
            f"{self.opa_url}/v1/data/ai/authz/allow",
            json=input_data
        )
        
        return response.json().get("result", False)

# OPA Rego Policy for AI Systems
rego_policy = """
package ai.authz

default allow = false

# Allow model training for ML engineers in dev environment
allow {
    input.user.role == "ml_engineer"
    input.action == "train"
    input.resource.type == "model"
    input.environment.stage == "development"
}

# Allow production deployment only with approval
allow {
    input.user.role == "ml_reviewer"
    input.action == "deploy"
    input.resource.type == "model"
    input.resource.accuracy >= 0.95
    input.resource.approved == true
    input.environment.stage == "production"
}

# PII data access requires special clearance
allow {
    input.action == "read"
    input.resource.contains_pii == true
    input.user.clearance_level >= 3
    input.user.completed_privacy_training == true
}
"""

RBAC vs ABAC Comparison

Aspect RBAC ABAC
Complexity Simple, role-based hierarchy Complex, multi-dimensional policies
Granularity Coarse-grained, role level Fine-grained, attribute level
Scalability Good for stable organizations Excellent for dynamic environments
Management Easy to understand and audit Requires policy expertise
AI Use Cases Model lifecycle stages Context-aware agent permissions
Performance Fast role checks Policy evaluation overhead

Implementing Hybrid RBAC-ABAC for AI

Combining RBAC and ABAC

Many AI systems benefit from a hybrid approach: RBAC for basic access control and ABAC for fine-grained, context-aware decisions.

# Hybrid RBAC-ABAC implementation
class HybridAccessControl:
    """Combines RBAC for base permissions with ABAC for fine control"""
    
    def __init__(self):
        self.rbac = AISystemRBAC()
        self.abac = AISystemABAC()
    
    def authorize_model_action(self, 
                               user_id: str,
                               model_id: str,
                               action: str,
                               context: Dict[str, Any]) -> bool:
        """Two-phase authorization check"""
        
        # Phase 1: RBAC check for basic permission
        if not self.rbac.check_model_access(user_id, model_id, action):
            return False
        
        # Phase 2: ABAC check for context-specific rules
        user_attrs = self.get_user_attributes(user_id)
        model_attrs = self.get_model_attributes(model_id)
        
        return self.abac.check_access(
            user_attrs=user_attrs,
            resource_attrs=model_attrs,
            action=action,
            environment=context
        )
    
    def authorize_agent_action(self,
                               agent_id: str,
                               target_resource: str,
                               action: str,
                               constraints: Dict[str, Any]) -> bool:
        """Authorize AI agent actions with constraints"""
        
        agent_attrs = {
            "id": agent_id,
            "capabilities": self.get_agent_capabilities(agent_id),
            "trust_level": self.get_agent_trust_score(agent_id)
        }
        
        resource_attrs = {
            "type": self.get_resource_type(target_resource),
            "sensitivity": self.get_resource_sensitivity(target_resource),
            "owner": self.get_resource_owner(target_resource)
        }
        
        # Apply time-based and rate-limiting constraints
        if "max_requests_per_hour" in constraints:
            if self.check_rate_limit(agent_id) > constraints["max_requests_per_hour"]:
                return False
        
        return self.abac.check_access(
            user_attrs=agent_attrs,
            resource_attrs=resource_attrs,
            action=action,
            environment=constraints
        )

Access Control for LLM Agents

Securing Autonomous AI Agents

LLM agents require special access control considerations due to their autonomous nature and potential for unexpected behaviors.

Agent Permission Framework:

  • Capability-based: Define what tools/APIs the agent can access
  • Scope-limited: Restrict data and system boundaries
  • Time-bounded: Set temporal limits on permissions
  • Auditable: Log all agent actions for review
# LLM Agent permission management
from enum import Enum
from datetime import datetime, timedelta
import hashlib

class AgentCapability(Enum):
    READ_PUBLIC_DATA = "read_public"
    READ_PRIVATE_DATA = "read_private"
    WRITE_DATA = "write_data"
    EXECUTE_CODE = "execute_code"
    CALL_EXTERNAL_API = "call_api"
    MODIFY_SYSTEM = "modify_system"

class LLMAgentAccessControl:
    """Access control system for LLM agents"""
    
    def __init__(self):
        self.agent_permissions = {}
        self.audit_log = []
    
    def create_agent_session(self,
                             agent_id: str,
                             purpose: str,
                             capabilities: List[AgentCapability],
                             duration_hours: int = 1) -> str:
        """Create time-bounded session for agent"""
        
        session_id = hashlib.sha256(
            f"{agent_id}{datetime.now()}".encode()
        ).hexdigest()[:16]
        
        self.agent_permissions[session_id] = {
            "agent_id": agent_id,
            "purpose": purpose,
            "capabilities": capabilities,
            "created_at": datetime.now(),
            "expires_at": datetime.now() + timedelta(hours=duration_hours),
            "request_count": 0,
            "max_requests": 1000
        }
        
        self.audit_log.append({
            "event": "session_created",
            "session_id": session_id,
            "agent_id": agent_id,
            "timestamp": datetime.now()
        })
        
        return session_id
    
    def check_agent_permission(self,
                               session_id: str,
                               requested_capability: AgentCapability,
                               resource: str) -> bool:
        """Check if agent has permission for action"""
        
        if session_id not in self.agent_permissions:
            return False
        
        session = self.agent_permissions[session_id]
        
        # Check session expiry
        if datetime.now() > session["expires_at"]:
            self.revoke_session(session_id)
            return False
        
        # Check rate limits
        if session["request_count"] >= session["max_requests"]:
            return False
        
        # Check capability
        if requested_capability not in session["capabilities"]:
            return False
        
        # Log the access attempt
        self.audit_log.append({
            "event": "permission_check",
            "session_id": session_id,
            "capability": requested_capability.value,
            "resource": resource,
            "granted": True,
            "timestamp": datetime.now()
        })
        
        session["request_count"] += 1
        return True

Security Considerations for AI Agents

  • Never grant agents permanent administrative permissions
  • Implement strict rate limiting to prevent resource exhaustion
  • Use separate permission models for different agent types
  • Require human approval for high-risk agent actions
  • Maintain comprehensive audit logs for all agent activities

Policy Engines for AI Systems

Open Policy Agent (OPA) Integration

OPA provides a unified policy framework that works well with AI systems, enabling complex authorization logic without hardcoding rules.

# Advanced OPA policy for AI model governance
rego_ai_governance = """
package ai.governance

import future.keywords.if
import future.keywords.contains

# Define risk levels for models
model_risk_level[model_id] = risk if {
    model := data.models[model_id]
    risk := calculate_risk(model)
}

calculate_risk(model) = "high" if {
    model.handles_pii == true
    model.accuracy < 0.99
} else = "medium" if {
    model.handles_pii == true
    model.accuracy >= 0.99
} else = "medium" if {
    model.business_critical == true
} else = "low"

# Production deployment rules
allow_production_deployment if {
    input.action == "deploy"
    input.environment == "production"
    model := data.models[input.model_id]
    
    # High-risk models need multiple approvals
    model_risk_level[input.model_id] == "high"
    count(model.approvals) >= 2
    
    # Check approver roles
    approver_roles := {approval.role | approval := model.approvals[_]}
    required_roles := {"ml_lead", "compliance_officer"}
    required_roles == approver_roles
    
    # Model must pass all tests
    model.test_coverage >= 0.9
    model.bias_test_passed == true
    model.security_scan_passed == true
}

# Data access based on purpose
allow_data_access if {
    input.action == "read"
    input.resource_type == "training_data"
    
    # Check purpose-based access
    input.purpose in ["model_training", "feature_engineering"]
    
    # Check data classification matches user clearance
    data_classification := data.datasets[input.dataset_id].classification
    user_clearance := input.user.clearance_level
    clearance_sufficient(data_classification, user_clearance)
}

clearance_sufficient(classification, clearance) if {
    classification == "public"
} else if {
    classification == "internal"
    clearance >= 1
} else if {
    classification == "confidential"
    clearance >= 2
} else if {
    classification == "restricted"
    clearance >= 3
}
"""

Casbin for ML Pipeline Authorization

Casbin provides flexible access control that adapts well to ML pipeline stages and model lifecycle management.

# Casbin configuration for ML pipeline
from typing import List, Tuple
import casbin

class MLPipelineAuthorization:
    """Authorization for ML pipeline stages"""
    
    def __init__(self):
        self.enforcer = self.setup_casbin()
    
    def setup_casbin(self) -> casbin.Enforcer:
        """Configure Casbin for ML pipeline"""
        
        model_text = """
        [request_definition]
        r = sub, obj, act, stage
        
        [policy_definition]
        p = sub, obj, act, stage
        
        [role_definition]
        g = _, _
        g2 = _, _
        
        [policy_effect]
        e = some(where (p.eft == allow))
        
        [matchers]
        m = g(r.sub, p.sub) && g2(r.obj, p.obj) && r.act == p.act && r.stage == p.stage
        """
        
        # Define pipeline stage policies
        policies = [
            ("data_engineer", "dataset", "create", "data_prep"),
            ("data_engineer", "feature", "engineer", "feature_eng"),
            ("ml_developer", "model", "train", "training"),
            ("ml_developer", "model", "evaluate", "validation"),
            ("ml_reviewer", "model", "approve", "review"),
            ("ml_ops", "model", "deploy", "deployment"),
            ("ml_ops", "model", "monitor", "production"),
        ]
        
        enforcer = casbin.Enforcer()
        enforcer.new_model_from_string(model_text)
        
        for policy in policies:
            enforcer.add_policy(*policy)
        
        return enforcer
    
    def can_execute_stage(self,
                          user: str,
                          resource: str,
                          action: str,
                          pipeline_stage: str) -> bool:
        """Check if user can execute pipeline stage"""
        return self.enforcer.enforce(user, resource, action, pipeline_stage)

Best Practices for AI Access Control

  • Start with RBAC: Use role-based control for basic permissions and organizational structure
  • Add ABAC for complexity: Layer attribute-based rules for fine-grained, context-aware decisions
  • Implement least privilege: Grant minimum necessary permissions for each role or agent
  • Time-bound permissions: Use temporary sessions for agents and automated processes
  • Comprehensive auditing: Log all access decisions and policy evaluations
  • Regular reviews: Periodically audit and update access policies
  • Separate environments: Use different permission models for dev, staging, and production
  • Human-in-the-loop: Require human approval for high-risk AI agent actions

Module 5: Security & Compliance Topics