diff --git a/keepercli-package/src/keepercli/commands/security_audit_report.py b/keepercli-package/src/keepercli/commands/security_audit_report.py index 4e5efef..dbba2cf 100644 --- a/keepercli-package/src/keepercli/commands/security_audit_report.py +++ b/keepercli-package/src/keepercli/commands/security_audit_report.py @@ -164,11 +164,22 @@ def _resolve_node_ids(self, enterprise_data, nodes: Optional[List[str]]) -> List return [] node_ids = [] + unresolved = [] for name_or_id in nodes: + matched = False for n in enterprise_data.nodes.get_all_entities(): if name_or_id == str(n.node_id) or name_or_id == n.name: node_ids.append(n.node_id) + matched = True break + if not matched: + unresolved.append(name_or_id) + + if unresolved: + raise base.CommandError( + f'Invalid node(s): {", ".join(repr(x) for x in unresolved)}. ' + 'Provide a valid node name or node UID.' + ) return node_ids def _format_report( diff --git a/keepercli-package/src/keepercli/commands/share_report.py b/keepercli-package/src/keepercli/commands/share_report.py index 9aafc6e..e38db78 100644 --- a/keepercli-package/src/keepercli/commands/share_report.py +++ b/keepercli-package/src/keepercli/commands/share_report.py @@ -126,7 +126,10 @@ def execute(self, context: KeeperParams, **kwargs) -> Any: if config.folders_only: return self._generate_folders_report(generator, output_format, output_file) if config.show_ownership: - return self._generate_ownership_report(generator, output_format, output_file, verbose) + return self._generate_ownership_report( + generator, output_format, output_file, verbose, + show_share_date=config.show_share_date + ) if config.record_filter: return self._generate_record_detail_report(generator, config) if config.user_filter: @@ -158,15 +161,18 @@ def _generate_ownership_report( generator: share_report.ShareReportGenerator, output_format: str, output_file: Optional[str], - verbose: bool + verbose: bool, + show_share_date: bool = False ) -> Optional[str]: """Generate record ownership report.""" entries = generator.generate_records_report() - headers = share_report.ShareReportGenerator.get_headers(ownership=True) + headers = share_report.ShareReportGenerator.get_headers( + ownership=True, show_share_date=show_share_date + ) table = [ [e.record_owner, e.record_uid, e.record_title, e.shared_with if verbose else e.shared_with_count, - '\n'.join(e.folder_paths)] + '\n'.join(e.folder_paths)] + ([e.share_date or ''] if show_share_date else []) for e in entries ] diff --git a/keepersdk-package/src/keepersdk/vault/share_report.py b/keepersdk-package/src/keepersdk/vault/share_report.py index 6606701..782566f 100644 --- a/keepersdk-package/src/keepersdk/vault/share_report.py +++ b/keepersdk-package/src/keepersdk/vault/share_report.py @@ -16,6 +16,7 @@ import dataclasses import datetime +import logging from typing import Optional, List, Dict, Any, Iterable, Set, NamedTuple from . import vault_online, vault_types, vault_utils @@ -23,6 +24,11 @@ from ..authentication import keeper_auth from ..enterprise import enterprise_data as enterprise_data_types +_SHARE_DATE_EVENT_TYPES = ['folder_add_record', 'record_add'] +_AUDIT_EVENT_LIMIT = 1000 +_SHARE_DATE_PAGINATION_MAX = 100 +_logger = logging.getLogger(__name__) + @dataclasses.dataclass class ShareReportEntry: @@ -217,21 +223,35 @@ def generate_records_report(self) -> List[ShareReportEntry]: share_info_map = self._fetch_share_info(list(record_uids)) or {} entries: List[ShareReportEntry] = [] processed_uids: Set[str] = set() - - user_filter_lower = {u.lower() for u in self._config.user_filter} if self._config.user_filter else None - + + user_filter_lower = ( + {u.lower() for u in self._config.user_filter} if self._config.user_filter else None + ) + share_date_map: Dict[str, str] = {} + if self._config.show_share_date and self._auth and self._enterprise: + sf_records = ( + self._get_shared_folder_records_for_user(user_filter_lower) + if user_filter_lower is not None + else self._get_all_shared_folder_records() + ) + all_uids = set(share_info_map.keys()) | sf_records + if all_uids: + share_date_map = self._fetch_share_dates(list(all_uids)) + for uid, share_info in share_info_map.items(): if not self._should_include_record(share_info): continue - + if user_filter_lower and not self._record_matches_user_filter(share_info, user_filter_lower): continue - - entries.append(self._build_share_entry(share_info)) + + entries.append(self._build_share_entry(share_info, share_date_map.get(uid))) processed_uids.add(uid) - - self._add_shared_folder_records(entries, processed_uids, share_info_map, user_filter_lower) - + + self._add_shared_folder_records( + entries, processed_uids, share_info_map, user_filter_lower, share_date_map + ) + return entries def _should_include_record(self, share_info: RecordShareInfo) -> bool: @@ -252,42 +272,45 @@ def _add_shared_folder_records( entries: List[ShareReportEntry], processed_uids: Set[str], share_info_map: Dict[str, RecordShareInfo], - user_filter_lower: Optional[Set[str]] + user_filter_lower: Optional[Set[str]], + share_date_map: Optional[Dict[str, str]] = None ) -> None: """Add records from shared folders that weren't returned by the share API.""" should_include = ( - self._config.user_filter or - self._config.show_ownership or + self._config.user_filter or + self._config.show_ownership or not self._config.record_filter ) - + if not should_include: return - + sf_records = ( self._get_shared_folder_records_for_user(user_filter_lower) if user_filter_lower else self._get_all_shared_folder_records() ) - + share_dates = share_date_map or {} + for record_uid in sf_records: if record_uid in processed_uids: continue - + record_info = self._vault.vault_data.get_record(record_uid) if not record_info: continue - + folder_paths = self._get_folder_paths(record_uid) owner = self._get_owner_from_share_info(share_info_map, record_uid) - + entries.append(ShareReportEntry( record_uid=record_uid, record_title=record_info.title, record_owner=owner, shared_with='', shared_with_count=0, - folder_paths=folder_paths + folder_paths=folder_paths, + share_date=share_dates.get(record_uid) )) processed_uids.add(record_uid) @@ -454,19 +477,27 @@ def generate_report_rows(self) -> Iterable[List[Any]]: elif self._config.show_ownership: for entry in self.generate_records_report(): shared_info = entry.shared_with if self._config.verbose else entry.shared_with_count - yield [entry.record_owner, entry.record_uid, entry.record_title, + row = [entry.record_owner, entry.record_uid, entry.record_title, shared_info, '\n'.join(entry.folder_paths)] + if self._config.show_share_date: + row.append(entry.share_date or '') + yield row else: for entry in self.generate_summary_report(): yield [entry.shared_to, entry.record_count, entry.shared_folder_count] @staticmethod - def get_headers(folders_only: bool = False, ownership: bool = False) -> List[str]: + def get_headers( + folders_only: bool = False, + ownership: bool = False, + show_share_date: bool = False + ) -> List[str]: """Get report headers based on configuration. Args: folders_only: True if generating shared folders report ownership: True if generating ownership report + show_share_date: True to include share date column (ownership report only) Returns: List of header column names @@ -474,7 +505,10 @@ def get_headers(folders_only: bool = False, ownership: bool = False) -> List[str if folders_only: return ['folder_uid', 'folder_name', 'shared_to', 'permissions', 'folder_path'] if ownership: - return ['record_owner', 'record_uid', 'record_title', 'shared_with', 'folder_path'] + headers = ['record_owner', 'record_uid', 'record_title', 'shared_with', 'folder_path'] + if show_share_date: + headers.append('share_date') + return headers return ['shared_to', 'records', 'shared_folders'] def _resolve_record_uids(self, record_refs: List[str]) -> Set[str]: @@ -504,7 +538,9 @@ def _fetch_share_info(self, record_uids: List[str]) -> Dict[str, RecordShareInfo try: shares_data = share_management_utils.get_record_shares( - self._vault, record_uids, is_share_admin=False + self._vault, + record_uids, + is_share_admin=self._config.show_share_date, ) if not shares_data: @@ -533,7 +569,70 @@ def _fetch_share_info(self, record_uids: List[str]) -> Dict[str, RecordShareInfo pass return result - + + def _fetch_share_dates(self, record_uids: List[str]) -> Dict[str, str]: + """Fetch earliest share-related audit event date per record (enterprise only). + Returns dict of record_uid -> formatted date string, or empty if not available. + """ + if not record_uids or not self._auth or not self._enterprise: + return {} + record_uid_set = set(record_uids) + min_ts: Dict[str, int] = {} + search_min_ts = int( + (datetime.datetime.now() - datetime.timedelta(days=365 * 5)).timestamp() + ) + audit_filter: Dict[str, Any] = { + 'audit_event_type': _SHARE_DATE_EVENT_TYPES, + 'created': {'min': search_min_ts}, + 'record_uid': record_uids, + } + rq: Dict[str, Any] = { + 'command': 'get_audit_event_reports', + 'scope': 'enterprise', + 'report_type': 'raw', + 'filter': audit_filter, + 'limit': _AUDIT_EVENT_LIMIT, + 'order': 'ascending', + } + iterations = 0 + try: + while iterations < _SHARE_DATE_PAGINATION_MAX: + iterations += 1 + rs = self._auth.execute_auth_command(rq) + events = rs.get('audit_event_overview_report_rows') or [] + if not events: + break + for event in events: + uid = event.get('record_uid') or '' + if uid not in record_uid_set: + continue + ts = event.get('created') + if ts is None: + continue + try: + ts_int = int(ts) + except (TypeError, ValueError): + continue + if uid not in min_ts or ts_int < min_ts[uid]: + min_ts[uid] = ts_int + if len(events) < _AUDIT_EVENT_LIMIT: + break + last_ts = max(int(e.get('created', 0)) for e in events) + audit_filter['created'] = {'min': last_ts + 1} + except Exception as e: + _logger.debug('Failed to fetch share dates from audit: %s', e) + # Format as date string (created may be Unix seconds or milliseconds) + result: Dict[str, str] = {} + for uid, ts in min_ts.items(): + try: + if ts > 1e12: + ts = ts // 1000 + dt = datetime.datetime.fromtimestamp(ts, tz=datetime.timezone.utc) + result[uid] = dt.strftime('%Y-%m-%d %H:%M UTC') + except (OSError, ValueError): + result[uid] = str(ts) + return result + def _parse_user_permissions(self, shares: Dict) -> List[UserPermissionInfo]: """Parse user permissions from share data.""" permissions = [] @@ -554,22 +653,25 @@ def _parse_user_permissions(self, shares: Dict) -> List[UserPermissionInfo]: )) return permissions - def _build_share_entry(self, share_info: RecordShareInfo) -> ShareReportEntry: + def _build_share_entry( + self, share_info: RecordShareInfo, share_date: Optional[str] = None + ) -> ShareReportEntry: """Build a ShareReportEntry from RecordShareInfo.""" owner = self._get_owner_from_share_info({share_info.record_uid: share_info}, share_info.record_uid) non_owner_shares = [p for p in share_info.user_permissions if not p.is_owner] - + shared_with = '' if self._config.verbose: shared_with = self._format_verbose_permissions(share_info) - + return ShareReportEntry( record_uid=share_info.record_uid, record_title=share_info.record_title, record_owner=owner, shared_with=shared_with, shared_with_count=len(non_owner_shares), - folder_paths=share_info.folder_paths + folder_paths=share_info.folder_paths, + share_date=share_date ) def _format_verbose_permissions(self, share_info: RecordShareInfo) -> str: