diff --git a/cycode/cli/apps/ai_guardrails/consts.py b/cycode/cli/apps/ai_guardrails/consts.py index 8714ec10..15a08074 100644 --- a/cycode/cli/apps/ai_guardrails/consts.py +++ b/cycode/cli/apps/ai_guardrails/consts.py @@ -25,6 +25,13 @@ class PolicyMode(str, Enum): WARN = 'warn' +class InstallMode(str, Enum): + """Installation mode for ai-guardrails install command.""" + + REPORT = 'report' + BLOCK = 'block' + + class IDEConfig(NamedTuple): """Configuration for an AI IDE.""" @@ -76,12 +83,15 @@ def _get_claude_code_hooks_dir() -> Path: # Command used in hooks CYCODE_SCAN_PROMPT_COMMAND = 'cycode ai-guardrails scan' +CYCODE_AUTH_CHECK_COMMAND = "if cycode status 2>&1 | grep -q 'Is authenticated: False'; then cycode auth 2>&1; fi" -def _get_cursor_hooks_config() -> dict: +def _get_cursor_hooks_config(async_mode: bool = False) -> dict: """Get Cursor-specific hooks configuration.""" config = IDE_CONFIGS[AIIDEType.CURSOR] - hooks = {event: [{'command': CYCODE_SCAN_PROMPT_COMMAND}] for event in config.hook_events} + command = f'{CYCODE_SCAN_PROMPT_COMMAND} &' if async_mode else CYCODE_SCAN_PROMPT_COMMAND + hooks = {event: [{'command': command}] for event in config.hook_events} + hooks['sessionStart'] = [{'command': CYCODE_AUTH_CHECK_COMMAND}] return { 'version': 1, @@ -89,7 +99,7 @@ def _get_cursor_hooks_config() -> dict: } -def _get_claude_code_hooks_config() -> dict: +def _get_claude_code_hooks_config(async_mode: bool = False) -> dict: """Get Claude Code-specific hooks configuration. Claude Code uses a different hook format with nested structure: @@ -98,36 +108,48 @@ def _get_claude_code_hooks_config() -> dict: """ command = f'{CYCODE_SCAN_PROMPT_COMMAND} --ide claude-code' + hook_entry = {'type': 'command', 'command': command} + if async_mode: + hook_entry['async'] = True + hook_entry['timeout'] = 20 + return { 'hooks': { + 'SessionStart': [ + { + 'matcher': '', + 'hooks': [{'type': 'command', 'command': CYCODE_AUTH_CHECK_COMMAND}], + } + ], 'UserPromptSubmit': [ { - 'hooks': [{'type': 'command', 'command': command}], + 'hooks': [hook_entry.copy()], } ], 'PreToolUse': [ { 'matcher': 'Read', - 'hooks': [{'type': 'command', 'command': command}], + 'hooks': [hook_entry.copy()], }, { 'matcher': 'mcp__.*', - 'hooks': [{'type': 'command', 'command': command}], + 'hooks': [hook_entry.copy()], }, ], }, } -def get_hooks_config(ide: AIIDEType) -> dict: +def get_hooks_config(ide: AIIDEType, async_mode: bool = False) -> dict: """Get the hooks configuration for a specific IDE. Args: ide: The AI IDE type + async_mode: If True, hooks run asynchronously (non-blocking) Returns: Dict with hooks configuration for the specified IDE """ if ide == AIIDEType.CLAUDE_CODE: - return _get_claude_code_hooks_config() - return _get_cursor_hooks_config() + return _get_claude_code_hooks_config(async_mode=async_mode) + return _get_cursor_hooks_config(async_mode=async_mode) diff --git a/cycode/cli/apps/ai_guardrails/hooks_manager.py b/cycode/cli/apps/ai_guardrails/hooks_manager.py index b8d43c43..24b6f6ff 100644 --- a/cycode/cli/apps/ai_guardrails/hooks_manager.py +++ b/cycode/cli/apps/ai_guardrails/hooks_manager.py @@ -5,17 +5,21 @@ Supports multiple IDEs: Cursor, Claude Code (future). """ +import copy import json from pathlib import Path from typing import Optional +import yaml + from cycode.cli.apps.ai_guardrails.consts import ( - CYCODE_SCAN_PROMPT_COMMAND, DEFAULT_IDE, IDE_CONFIGS, AIIDEType, + PolicyMode, get_hooks_config, ) +from cycode.cli.apps.ai_guardrails.scan.consts import DEFAULT_POLICY, POLICY_FILE_NAME from cycode.logger import get_logger logger = get_logger('AI Guardrails Hooks') @@ -58,6 +62,13 @@ def save_hooks_file(hooks_path: Path, hooks_config: dict) -> bool: return False +_CYCODE_COMMAND_MARKERS = ('cycode ai-guardrails', 'cycode auth') + + +def _is_cycode_command(command: str) -> bool: + return any(marker in command for marker in _CYCODE_COMMAND_MARKERS) + + def is_cycode_hook_entry(entry: dict) -> bool: """Check if a hook entry is from cycode-cli. @@ -68,7 +79,7 @@ def is_cycode_hook_entry(entry: dict) -> bool: """ # Check Cursor format (flat command) command = entry.get('command', '') - if CYCODE_SCAN_PROMPT_COMMAND in command: + if _is_cycode_command(command): return True # Check Claude Code format (nested hooks array) @@ -76,14 +87,58 @@ def is_cycode_hook_entry(entry: dict) -> bool: for hook in hooks: if isinstance(hook, dict): hook_command = hook.get('command', '') - if CYCODE_SCAN_PROMPT_COMMAND in hook_command: + if _is_cycode_command(hook_command): return True return False +def _load_policy(policy_path: Path) -> dict: + """Load existing policy file merged with defaults, or return defaults if not found.""" + if not policy_path.exists(): + return copy.deepcopy(DEFAULT_POLICY) + try: + existing = yaml.safe_load(policy_path.read_text(encoding='utf-8')) or {} + except Exception: + existing = {} + return {**copy.deepcopy(DEFAULT_POLICY), **existing} + + +def create_policy_file(scope: str, mode: PolicyMode, repo_path: Optional[Path] = None) -> tuple[bool, str]: + """Create or update the ai-guardrails.yaml policy file. + + If the file already exists, only the mode field is updated. + If it doesn't exist, a new file is created from the default policy. + + Args: + scope: 'user' for user-level, 'repo' for repository-level + mode: The policy mode to set + repo_path: Repository path (required if scope is 'repo') + + Returns: + Tuple of (success, message) + """ + config_dir = repo_path / '.cycode' if scope == 'repo' and repo_path else Path.home() / '.cycode' + policy_path = config_dir / POLICY_FILE_NAME + + policy = _load_policy(policy_path) + + policy['mode'] = mode.value + + try: + config_dir.mkdir(parents=True, exist_ok=True) + policy_path.write_text(yaml.dump(policy, default_flow_style=False, sort_keys=False), encoding='utf-8') + return True, f'AI guardrails policy ({mode.value} mode) set: {policy_path}' + except Exception as e: + logger.error('Failed to create policy file', exc_info=e) + return False, f'Failed to create policy file: {policy_path}' + + def install_hooks( - scope: str = 'user', repo_path: Optional[Path] = None, ide: AIIDEType = DEFAULT_IDE + scope: str = 'user', + repo_path: Optional[Path] = None, + ide: AIIDEType = DEFAULT_IDE, + report_mode: bool = False, ) -> tuple[bool, str]: """ Install Cycode AI guardrails hooks. @@ -92,6 +147,7 @@ def install_hooks( scope: 'user' for user-level hooks, 'repo' for repository-level hooks repo_path: Repository path (required if scope is 'repo') ide: The AI IDE type (default: Cursor) + report_mode: If True, install hooks in async mode (non-blocking) Returns: Tuple of (success, message) @@ -104,7 +160,7 @@ def install_hooks( existing.setdefault('hooks', {}) # Get IDE-specific hooks configuration - hooks_config = get_hooks_config(ide) + hooks_config = get_hooks_config(ide, async_mode=report_mode) # Add/update Cycode hooks for event, entries in hooks_config['hooks'].items(): diff --git a/cycode/cli/apps/ai_guardrails/install_command.py b/cycode/cli/apps/ai_guardrails/install_command.py index a72d5d4c..a92a978f 100644 --- a/cycode/cli/apps/ai_guardrails/install_command.py +++ b/cycode/cli/apps/ai_guardrails/install_command.py @@ -11,8 +11,8 @@ validate_and_parse_ide, validate_scope, ) -from cycode.cli.apps.ai_guardrails.consts import IDE_CONFIGS, AIIDEType -from cycode.cli.apps.ai_guardrails.hooks_manager import install_hooks +from cycode.cli.apps.ai_guardrails.consts import IDE_CONFIGS, AIIDEType, InstallMode, PolicyMode +from cycode.cli.apps.ai_guardrails.hooks_manager import create_policy_file, install_hooks def install_command( @@ -43,6 +43,15 @@ def install_command( resolve_path=True, ), ] = None, + mode: Annotated[ + InstallMode, + typer.Option( + '--mode', + '-m', + help='Installation mode: "report" for async non-blocking hooks with warn policy, ' + '"block" for sync blocking hooks.', + ), + ] = InstallMode.REPORT, ) -> None: """Install AI guardrails hooks for supported IDEs. @@ -50,7 +59,8 @@ def install_command( and MCP tool calls for secrets before they are sent to AI models. Examples: - cycode ai-guardrails install # Install for all projects (user scope) + cycode ai-guardrails install # Install in report mode (default) + cycode ai-guardrails install --mode block # Install in block mode cycode ai-guardrails install --scope repo # Install for current repo only cycode ai-guardrails install --ide cursor # Install for Cursor IDE cycode ai-guardrails install --ide all # Install for all supported IDEs @@ -66,7 +76,8 @@ def install_command( results: list[tuple[str, bool, str]] = [] for current_ide in ides_to_install: ide_name = IDE_CONFIGS[current_ide].name - success, message = install_hooks(scope, repo_path, ide=current_ide) + report_mode = mode == InstallMode.REPORT + success, message = install_hooks(scope, repo_path, ide=current_ide, report_mode=report_mode) results.append((ide_name, success, message)) # Report results for each IDE @@ -81,14 +92,31 @@ def install_command( all_success = False if any_success: - console.print() - console.print('[bold]Next steps:[/]') - successful_ides = [name for name, success, _ in results if success] - ide_list = ', '.join(successful_ides) - console.print(f'1. Restart {ide_list} to activate the hooks') - console.print('2. (Optional) Customize policy in ~/.cycode/ai-guardrails.yaml') - console.print() - console.print('[dim]The hooks will scan prompts, file reads, and MCP tool calls for secrets.[/]') + policy_mode = PolicyMode.WARN if mode == InstallMode.REPORT else PolicyMode.BLOCK + _install_policy(scope, repo_path, policy_mode) + _print_next_steps(results, mode) if not all_success: raise typer.Exit(1) + + +def _install_policy(scope: str, repo_path: Optional[Path], policy_mode: PolicyMode) -> None: + policy_success, policy_message = create_policy_file(scope, policy_mode, repo_path) + if policy_success: + console.print(f'[green]✓[/] {policy_message}') + else: + console.print(f'[red]✗[/] {policy_message}', style='bold red') + + +def _print_next_steps(results: list[tuple[str, bool, str]], mode: InstallMode) -> None: + console.print() + console.print('[bold]Next steps:[/]') + successful_ides = [name for name, success, _ in results if success] + ide_list = ', '.join(successful_ides) + console.print(f'1. Restart {ide_list} to activate the hooks') + console.print('2. (Optional) Customize policy in ~/.cycode/ai-guardrails.yaml') + console.print() + if mode == InstallMode.REPORT: + console.print('[dim]Report mode: hooks run async (non-blocking) and policy is set to warn.[/]') + else: + console.print('[dim]The hooks will scan prompts, file reads, and MCP tool calls for secrets.[/]') diff --git a/tests/cli/commands/ai_guardrails/scan/test_payload.py b/tests/cli/commands/ai_guardrails/scan/test_payload.py index 27c3010f..e17d833d 100644 --- a/tests/cli/commands/ai_guardrails/scan/test_payload.py +++ b/tests/cli/commands/ai_guardrails/scan/test_payload.py @@ -29,6 +29,7 @@ def test_from_cursor_payload_prompt_event() -> None: assert unified.ide_provider == 'cursor' assert unified.ide_version == '0.42.0' assert unified.prompt == 'Test prompt' + assert type(unified.ide_provider) is str def test_from_cursor_payload_file_read_event() -> None: @@ -153,6 +154,7 @@ def test_from_claude_code_payload_prompt_event() -> None: assert unified.conversation_id == 'session-123' assert unified.ide_provider == 'claude-code' assert unified.prompt == 'Test prompt for Claude Code' + assert type(unified.ide_provider) is str def test_from_claude_code_payload_file_read_event() -> None: diff --git a/tests/cli/commands/ai_guardrails/test_hooks_manager.py b/tests/cli/commands/ai_guardrails/test_hooks_manager.py index f0dec6f7..a0eb6e09 100644 --- a/tests/cli/commands/ai_guardrails/test_hooks_manager.py +++ b/tests/cli/commands/ai_guardrails/test_hooks_manager.py @@ -1,6 +1,18 @@ """Tests for AI guardrails hooks manager.""" -from cycode.cli.apps.ai_guardrails.hooks_manager import is_cycode_hook_entry +from pathlib import Path + +import yaml +from pyfakefs.fake_filesystem import FakeFilesystem + +from cycode.cli.apps.ai_guardrails.consts import ( + CYCODE_AUTH_CHECK_COMMAND, + CYCODE_SCAN_PROMPT_COMMAND, + AIIDEType, + PolicyMode, + get_hooks_config, +) +from cycode.cli.apps.ai_guardrails.hooks_manager import create_policy_file, is_cycode_hook_entry def test_is_cycode_hook_entry_cursor_format() -> None: @@ -51,3 +63,122 @@ def test_is_cycode_hook_entry_partial_match() -> None: entry = {'command': 'cycode ai-guardrails scan --verbose'} assert is_cycode_hook_entry(entry) is True + + +def test_get_hooks_config_cursor_sync() -> None: + """Test Cursor hooks config in default (sync) mode.""" + config = get_hooks_config(AIIDEType.CURSOR) + hooks = config['hooks'] + scan_hooks = {k: v for k, v in hooks.items() if k != 'sessionStart'} + for entries in scan_hooks.values(): + for entry in entries: + assert entry['command'] == CYCODE_SCAN_PROMPT_COMMAND + assert '&' not in entry['command'] + + +def test_get_hooks_config_cursor_async() -> None: + """Test Cursor hooks config in async mode appends & to command.""" + config = get_hooks_config(AIIDEType.CURSOR, async_mode=True) + hooks = config['hooks'] + scan_hooks = {k: v for k, v in hooks.items() if k != 'sessionStart'} + for entries in scan_hooks.values(): + for entry in entries: + assert entry['command'].endswith('&') + assert CYCODE_SCAN_PROMPT_COMMAND in entry['command'] + + +def test_get_hooks_config_cursor_session_start() -> None: + """Test Cursor hooks config includes sessionStart auth check.""" + config = get_hooks_config(AIIDEType.CURSOR) + assert 'sessionStart' in config['hooks'] + entries = config['hooks']['sessionStart'] + assert len(entries) == 1 + assert entries[0]['command'] == CYCODE_AUTH_CHECK_COMMAND + + +def test_get_hooks_config_claude_code_sync() -> None: + """Test Claude Code hooks config in default (sync) mode.""" + config = get_hooks_config(AIIDEType.CLAUDE_CODE) + scan_events = {k: v for k, v in config['hooks'].items() if k != 'SessionStart'} + for event_entries in scan_events.values(): + for event_entry in event_entries: + for hook in event_entry['hooks']: + assert 'async' not in hook + assert 'timeout' not in hook + + +def test_get_hooks_config_claude_code_async() -> None: + """Test Claude Code hooks config in async mode adds async and timeout.""" + config = get_hooks_config(AIIDEType.CLAUDE_CODE, async_mode=True) + scan_events = {k: v for k, v in config['hooks'].items() if k != 'SessionStart'} + for event_entries in scan_events.values(): + for event_entry in event_entries: + for hook in event_entry['hooks']: + assert hook['async'] is True + + +def test_get_hooks_config_claude_code_session_start() -> None: + """Test Claude Code hooks config includes SessionStart auth check.""" + config = get_hooks_config(AIIDEType.CLAUDE_CODE) + assert 'SessionStart' in config['hooks'] + entries = config['hooks']['SessionStart'] + assert len(entries) == 1 + assert entries[0]['hooks'][0]['command'] == CYCODE_AUTH_CHECK_COMMAND + + +def test_create_policy_file_warn(fs: FakeFilesystem) -> None: + """Test creating warn-mode policy file.""" + fs.create_dir(Path.home()) + success, message = create_policy_file('user', PolicyMode.WARN) + + assert success is True + assert 'warn mode' in message + + policy_path = Path.home() / '.cycode' / 'ai-guardrails.yaml' + assert policy_path.exists() + + policy = yaml.safe_load(policy_path.read_text()) + assert policy['mode'] == 'warn' + + +def test_create_policy_file_block(fs: FakeFilesystem) -> None: + """Test creating block-mode policy file.""" + fs.create_dir(Path.home()) + success, message = create_policy_file('user', PolicyMode.BLOCK) + + assert success is True + assert 'block mode' in message + + policy_path = Path.home() / '.cycode' / 'ai-guardrails.yaml' + policy = yaml.safe_load(policy_path.read_text()) + assert policy['mode'] == 'block' + + +def test_create_policy_file_updates_existing(fs: FakeFilesystem) -> None: + """Test that re-running only updates mode and preserves other customizations.""" + policy_dir = Path.home() / '.cycode' + fs.create_dir(policy_dir) + policy_path = policy_dir / 'ai-guardrails.yaml' + policy_path.write_text(yaml.dump({'version': 1, 'mode': 'warn', 'custom_field': 'keep_me'})) + + success, _ = create_policy_file('user', PolicyMode.BLOCK) + + assert success is True + policy = yaml.safe_load(policy_path.read_text()) + assert policy['mode'] == 'block' + assert policy['custom_field'] == 'keep_me' + + +def test_create_policy_file_repo_scope(fs: FakeFilesystem) -> None: + """Test creating policy file in repo scope.""" + repo_path = Path('/my-repo') + fs.create_dir(repo_path) + + success, message = create_policy_file('repo', PolicyMode.WARN, repo_path=repo_path) + + assert success is True + policy_path = repo_path / '.cycode' / 'ai-guardrails.yaml' + assert policy_path.exists() + + policy = yaml.safe_load(policy_path.read_text()) + assert policy['mode'] == 'warn'