diff --git a/application/external_apps/bulkloader/requirements.txt b/application/external_apps/bulkloader/requirements.txt index e31bd339..2cee8a3b 100644 --- a/application/external_apps/bulkloader/requirements.txt +++ b/application/external_apps/bulkloader/requirements.txt @@ -1,4 +1,3 @@ -requests -msal -logging -dotenv \ No newline at end of file +requests==2.32.4 +msal==1.31.0 +python-dotenv==0.21.0 \ No newline at end of file diff --git a/application/external_apps/databaseseeder/requirements.txt b/application/external_apps/databaseseeder/requirements.txt index 72d2766e..dfdf02ed 100644 --- a/application/external_apps/databaseseeder/requirements.txt +++ b/application/external_apps/databaseseeder/requirements.txt @@ -1,5 +1,3 @@ -requests -msal -logging -dotenv -flask +requests==2.32.4 +msal==1.31.0 +python-dotenv==0.21.0 diff --git a/application/single_app/config.py b/application/single_app/config.py index f8931bb6..dd712ec8 100644 --- a/application/single_app/config.py +++ b/application/single_app/config.py @@ -46,9 +46,9 @@ session, send_from_directory, send_file, - Markup, current_app ) +from markupsafe import Markup from werkzeug.utils import secure_filename from datetime import datetime, timezone, timedelta from functools import wraps @@ -94,7 +94,7 @@ EXECUTOR_TYPE = 'thread' EXECUTOR_MAX_WORKERS = 30 SESSION_TYPE = 'filesystem' -VERSION = "0.240.006" +VERSION = "0.240.020" SECRET_KEY = os.getenv('SECRET_KEY', 'dev-secret-key-change-in-production') diff --git a/application/single_app/functions_message_artifacts.py b/application/single_app/functions_message_artifacts.py new file mode 100644 index 00000000..cf57412d --- /dev/null +++ b/application/single_app/functions_message_artifacts.py @@ -0,0 +1,446 @@ +# functions_message_artifacts.py +"""Helpers for storing large assistant-side payloads outside primary chat items.""" + +import json +from copy import deepcopy +from typing import Any, Dict, List, Optional, Tuple + + +ASSISTANT_ARTIFACT_ROLE = 'assistant_artifact' +ASSISTANT_ARTIFACT_CHUNK_ROLE = 'assistant_artifact_chunk' +ASSISTANT_ARTIFACT_KIND_AGENT_CITATION = 'agent_citation' +ASSISTANT_ARTIFACT_CHUNK_SIZE = 180000 +COMPACT_VALUE_MAX_STRING = 400 +COMPACT_VALUE_MAX_LIST_ITEMS = 5 +COMPACT_VALUE_MAX_DICT_KEYS = 12 +COMPACT_VALUE_MAX_DEPTH = 3 +TABULAR_ARGUMENT_EXCLUDE_KEYS = { + 'conversation_id', + 'group_id', + 'public_workspace_id', + 'source', + 'user_id', +} + + +def is_assistant_artifact_role(role: Optional[str]) -> bool: + """Return True for auxiliary assistant artifact records stored in messages.""" + return role in {ASSISTANT_ARTIFACT_ROLE, ASSISTANT_ARTIFACT_CHUNK_ROLE} + + +def filter_assistant_artifact_items(items: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Return only primary conversation items, excluding assistant artifacts and chunks.""" + return [item for item in items or [] if not is_assistant_artifact_role(item.get('role'))] + + +def make_json_serializable(value: Any) -> Any: + """Convert nested values into JSON-serializable structures.""" + if value is None or isinstance(value, (str, int, float, bool)): + return value + if isinstance(value, dict): + return {str(key): make_json_serializable(item) for key, item in value.items()} + if isinstance(value, (list, tuple)): + return [make_json_serializable(item) for item in value] + return str(value) + + +def build_agent_citation_artifact_documents( + conversation_id: str, + assistant_message_id: str, + agent_citations: List[Dict[str, Any]], + created_timestamp: str, + user_info: Optional[Dict[str, Any]] = None, +) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: + """Return compact citations plus auxiliary message records for full raw payloads.""" + compact_citations: List[Dict[str, Any]] = [] + artifact_docs: List[Dict[str, Any]] = [] + + for index, citation in enumerate(agent_citations or [], start=1): + serializable_citation = make_json_serializable(citation) + artifact_id = f"{assistant_message_id}_artifact_{index}" + + compact_citations.append( + build_compact_agent_citation(serializable_citation, artifact_id=artifact_id) + ) + artifact_docs.extend( + _build_artifact_documents( + conversation_id=conversation_id, + assistant_message_id=assistant_message_id, + artifact_id=artifact_id, + artifact_kind=ASSISTANT_ARTIFACT_KIND_AGENT_CITATION, + payload={ + 'schema_version': 1, + 'artifact_kind': ASSISTANT_ARTIFACT_KIND_AGENT_CITATION, + 'citation': serializable_citation, + }, + created_timestamp=created_timestamp, + artifact_index=index, + user_info=user_info, + citation=serializable_citation if isinstance(serializable_citation, dict) else None, + ) + ) + + return compact_citations, artifact_docs + + +def build_message_artifact_payload_map(raw_messages: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]: + """Reassemble assistant artifact records into a payload map keyed by artifact id.""" + artifact_messages: Dict[str, Dict[str, Any]] = {} + artifact_chunks: Dict[str, Dict[int, str]] = {} + + for message in raw_messages or []: + role = message.get('role') + if role == ASSISTANT_ARTIFACT_ROLE: + artifact_messages[message.get('id')] = message + elif role == ASSISTANT_ARTIFACT_CHUNK_ROLE: + parent_id = message.get('parent_message_id') + if not parent_id: + continue + artifact_chunks.setdefault(parent_id, {})[ + int((message.get('metadata') or {}).get('chunk_index', 0)) + ] = str(message.get('content', '')) + + artifact_payloads: Dict[str, Dict[str, Any]] = {} + for artifact_id, artifact_message in artifact_messages.items(): + content = str(artifact_message.get('content', '')) + metadata = artifact_message.get('metadata', {}) or {} + + if metadata.get('is_chunked'): + total_chunks = int(metadata.get('total_chunks', 1) or 1) + chunk_map = artifact_chunks.get(artifact_id, {}) + rebuilt_chunks = [content] + for chunk_index in range(1, total_chunks): + rebuilt_chunks.append(chunk_map.get(chunk_index, '')) + content = ''.join(rebuilt_chunks) + + try: + parsed = json.loads(content) + except Exception: + continue + + if isinstance(parsed, dict): + artifact_payloads[artifact_id] = parsed + + return artifact_payloads + + +def hydrate_agent_citations_from_artifacts( + messages: List[Dict[str, Any]], + artifact_payload_map: Dict[str, Dict[str, Any]], +) -> List[Dict[str, Any]]: + """Return messages with agent citations inflated from assistant artifact records.""" + hydrated_messages: List[Dict[str, Any]] = [] + + for message in messages or []: + hydrated_message = deepcopy(message) + agent_citations = hydrated_message.get('agent_citations') + if not isinstance(agent_citations, list) or not agent_citations: + hydrated_messages.append(hydrated_message) + continue + + hydrated_citations = [] + for citation in agent_citations: + if not isinstance(citation, dict): + hydrated_citations.append(citation) + continue + + artifact_id = citation.get('artifact_id') + artifact_payload = artifact_payload_map.get(str(artifact_id or '')) + raw_citation = artifact_payload.get('citation') if isinstance(artifact_payload, dict) else None + if isinstance(raw_citation, dict): + merged_citation = deepcopy(raw_citation) + merged_citation.setdefault('artifact_id', artifact_id) + merged_citation.setdefault('raw_payload_externalized', True) + hydrated_citations.append(merged_citation) + else: + hydrated_citations.append(citation) + + hydrated_message['agent_citations'] = hydrated_citations + hydrated_messages.append(hydrated_message) + + return hydrated_messages + + +def build_compact_agent_citation(citation: Any, artifact_id: Optional[str] = None) -> Dict[str, Any]: + """Build a compact citation record suitable for storing on the assistant message.""" + if not isinstance(citation, dict): + compact_value = _compact_value(citation) + compact_citation = { + 'tool_name': 'Tool invocation', + 'function_result': compact_value, + } + if artifact_id: + compact_citation['artifact_id'] = artifact_id + compact_citation['raw_payload_externalized'] = True + return compact_citation + + function_name = str(citation.get('function_name') or '').strip() + plugin_name = str(citation.get('plugin_name') or '').strip() + compact_citation = { + 'tool_name': citation.get('tool_name') or function_name or 'Tool invocation', + 'function_name': citation.get('function_name'), + 'plugin_name': citation.get('plugin_name'), + 'duration_ms': citation.get('duration_ms'), + 'timestamp': citation.get('timestamp'), + 'success': citation.get('success'), + 'error_message': _compact_value(citation.get('error_message')), + 'function_arguments': _compact_function_arguments( + citation.get('function_arguments'), + function_name=function_name, + plugin_name=plugin_name, + ), + 'function_result': _compact_function_result( + citation.get('function_result'), + function_name=function_name, + plugin_name=plugin_name, + ), + } + if artifact_id: + compact_citation['artifact_id'] = artifact_id + compact_citation['raw_payload_externalized'] = True + return _remove_empty_values(compact_citation) + + +def _build_artifact_documents( + conversation_id: str, + assistant_message_id: str, + artifact_id: str, + artifact_kind: str, + payload: Dict[str, Any], + created_timestamp: str, + artifact_index: int, + user_info: Optional[Dict[str, Any]] = None, + citation: Optional[Dict[str, Any]] = None, +) -> List[Dict[str, Any]]: + serialized_payload = json.dumps(payload, default=str) + chunks = [ + serialized_payload[index:index + ASSISTANT_ARTIFACT_CHUNK_SIZE] + for index in range(0, len(serialized_payload), ASSISTANT_ARTIFACT_CHUNK_SIZE) + ] or [''] + + base_metadata = { + 'artifact_type': artifact_kind, + 'artifact_index': artifact_index, + 'is_chunked': len(chunks) > 1, + 'total_chunks': len(chunks), + 'chunk_index': 0, + 'root_message_id': assistant_message_id, + 'user_info': user_info, + } + if citation: + base_metadata.update({ + 'tool_name': citation.get('tool_name'), + 'function_name': citation.get('function_name'), + 'plugin_name': citation.get('plugin_name'), + }) + + main_doc = { + 'id': artifact_id, + 'conversation_id': conversation_id, + 'role': ASSISTANT_ARTIFACT_ROLE, + 'content': chunks[0], + 'parent_message_id': assistant_message_id, + 'artifact_kind': artifact_kind, + 'timestamp': created_timestamp, + 'metadata': base_metadata, + } + + docs = [main_doc] + for chunk_index in range(1, len(chunks)): + docs.append({ + 'id': f"{artifact_id}_chunk_{chunk_index}", + 'conversation_id': conversation_id, + 'role': ASSISTANT_ARTIFACT_CHUNK_ROLE, + 'content': chunks[chunk_index], + 'parent_message_id': artifact_id, + 'artifact_kind': artifact_kind, + 'timestamp': created_timestamp, + 'metadata': { + 'artifact_type': artifact_kind, + 'artifact_index': artifact_index, + 'is_chunk': True, + 'chunk_index': chunk_index, + 'total_chunks': len(chunks), + 'parent_message_id': artifact_id, + 'root_message_id': assistant_message_id, + 'user_info': user_info, + }, + }) + + return docs + + +def _compact_function_arguments(arguments: Any, function_name: str, plugin_name: str) -> Any: + parsed_arguments = _parse_json_if_possible(arguments) + if not isinstance(parsed_arguments, dict): + return _compact_value(parsed_arguments) + + if _is_tabular_citation(function_name, plugin_name): + filtered_arguments = { + key: value + for key, value in parsed_arguments.items() + if key not in TABULAR_ARGUMENT_EXCLUDE_KEYS + } + return _compact_value(filtered_arguments) + + return _compact_value(parsed_arguments) + + +def _compact_function_result(result: Any, function_name: str, plugin_name: str) -> Any: + parsed_result = _parse_json_if_possible(result) + if _is_tabular_citation(function_name, plugin_name): + return _compact_tabular_result_payload(function_name, parsed_result) + return _compact_value(parsed_result) + + +def _compact_tabular_result_payload(function_name: str, payload: Any) -> Any: + if not isinstance(payload, dict): + return _compact_value(payload) + + summary: Dict[str, Any] = {} + preferred_keys = [ + 'filename', + 'selected_sheet', + 'source_sheet', + 'source_value_column', + 'target_sheet', + 'target_match_column', + 'lookup_column', + 'lookup_value', + 'target_column', + 'match_operator', + 'column', + 'operation', + 'group_by', + 'aggregate_column', + 'date_component', + 'query_expression', + 'filter_applied', + 'source_filter_applied', + 'target_filter_applied', + 'normalize_match', + 'row_count', + 'rows_scanned', + 'distinct_count', + 'returned_values', + 'values', + 'source_cohort_size', + 'matched_source_value_count', + 'unmatched_source_value_count', + 'source_value_match_counts_returned', + 'source_value_match_counts_limited', + 'matched_target_row_count', + 'total_matches', + 'returned_rows', + 'groups', + 'value', + 'result', + 'highest_group', + 'highest_value', + 'lowest_group', + 'lowest_value', + 'error', + 'candidate_sheets', + 'sheet_count', + ] + + for key in preferred_keys: + if key in payload: + summary[key] = _compact_value(payload.get(key), depth=1) + + if isinstance(payload.get('top_results'), dict): + summary['top_results'] = _compact_value(payload.get('top_results'), depth=1) + + if isinstance(payload.get('details'), list): + summary['details'] = _compact_value(payload.get('details'), depth=1) + + source_value_match_counts = payload.get('source_value_match_counts') + if isinstance(source_value_match_counts, list) and source_value_match_counts: + summary['source_value_match_counts'] = [ + _compact_value(item, depth=1) + for item in source_value_match_counts[:10] + ] + summary['source_value_match_counts_sample_limited'] = len(source_value_match_counts) > 10 + + data_rows = payload.get('data') + if isinstance(data_rows, list) and data_rows: + summary['sample_rows'] = [_compact_value(row, depth=1) for row in data_rows[:3]] + summary['sample_rows_limited'] = len(data_rows) > 3 or int(payload.get('returned_rows') or 0) > 3 + + if function_name == 'lookup_value' and 'value' not in summary and isinstance(data_rows, list) and len(data_rows) == 1: + summary['sample_rows'] = [_compact_value(data_rows[0], depth=1)] + + return _remove_empty_values(summary) + + +def _compact_value(value: Any, depth: int = 0) -> Any: + if value is None or isinstance(value, (int, float, bool)): + return value + + if isinstance(value, str): + if len(value) <= COMPACT_VALUE_MAX_STRING: + return value + return f"{value[:COMPACT_VALUE_MAX_STRING]}... [truncated {len(value) - COMPACT_VALUE_MAX_STRING} chars]" + + if depth >= COMPACT_VALUE_MAX_DEPTH: + if isinstance(value, dict): + return f"" + if isinstance(value, list): + return f"" + return str(value) + + if isinstance(value, list): + compact_items = [_compact_value(item, depth=depth + 1) for item in value[:COMPACT_VALUE_MAX_LIST_ITEMS]] + if len(value) > COMPACT_VALUE_MAX_LIST_ITEMS: + compact_items.append({'remaining_items': len(value) - COMPACT_VALUE_MAX_LIST_ITEMS}) + return compact_items + + if isinstance(value, dict): + compact_mapping: Dict[str, Any] = {} + for index, (key, item) in enumerate(value.items()): + if index >= COMPACT_VALUE_MAX_DICT_KEYS: + compact_mapping['remaining_keys'] = len(value) - COMPACT_VALUE_MAX_DICT_KEYS + break + compact_mapping[str(key)] = _compact_value(item, depth=depth + 1) + return compact_mapping + + return str(value) + + +def _parse_json_if_possible(value: Any) -> Any: + if not isinstance(value, str): + return value + + trimmed = value.strip() + if not trimmed or trimmed[0] not in '{[': + return value + + try: + return json.loads(trimmed) + except Exception: + return value + + +def _is_tabular_citation(function_name: str, plugin_name: str) -> bool: + return plugin_name == 'TabularProcessingPlugin' or function_name in { + 'aggregate_column', + 'count_rows', + 'count_rows_by_related_values', + 'describe_tabular_file', + 'filter_rows', + 'filter_rows_by_related_values', + 'get_distinct_values', + 'group_by_aggregate', + 'group_by_datetime_component', + 'lookup_value', + 'query_tabular_data', + } + + +def _remove_empty_values(mapping: Dict[str, Any]) -> Dict[str, Any]: + cleaned: Dict[str, Any] = {} + for key, value in mapping.items(): + if value in (None, '', [], {}): + continue + cleaned[key] = value + return cleaned \ No newline at end of file diff --git a/application/single_app/json_schema_validation.py b/application/single_app/json_schema_validation.py index 8bc3ae88..abfbcb90 100644 --- a/application/single_app/json_schema_validation.py +++ b/application/single_app/json_schema_validation.py @@ -7,6 +7,26 @@ SCHEMA_DIR = os.path.join(os.path.dirname(__file__), 'static', 'json', 'schemas') +PLUGIN_STORAGE_MANAGED_FIELDS = { + '_attachments', + '_etag', + '_rid', + '_self', + '_ts', + 'created_at', + 'created_by', + 'group_id', + 'id', + 'is_global', + 'is_group', + 'last_updated', + 'modified_at', + 'modified_by', + 'scope', + 'updated_at', + 'user_id', +} + @lru_cache(maxsize=8) def load_schema(schema_name): path = os.path.join(SCHEMA_DIR, schema_name) @@ -33,11 +53,9 @@ def validate_plugin(plugin): plugin_copy = plugin.copy() plugin_type = plugin_copy.get('type', '') - # Remove Cosmos DB system fields that are not part of the plugin schema - cosmos_fields = ['_attachments', '_etag', '_rid', '_self', '_ts', 'created_at', 'updated_at', 'id', 'user_id', 'last_updated'] - for field in cosmos_fields: - if field in plugin_copy: - del plugin_copy[field] + # Remove storage-managed fields that appear on persisted plugin documents but are not part of the schema. + for field in PLUGIN_STORAGE_MANAGED_FIELDS: + plugin_copy.pop(field, None) if plugin_type in ['sql_schema', 'sql_query'] and not plugin_copy.get('endpoint'): plugin_copy['endpoint'] = f'sql://{plugin_type}' diff --git a/application/single_app/requirements.txt b/application/single_app/requirements.txt index 8039c4c1..a01ab60c 100644 --- a/application/single_app/requirements.txt +++ b/application/single_app/requirements.txt @@ -1,14 +1,14 @@ # requirements.txt pandas==2.2.3 azure-monitor-query==1.4.1 -Flask==2.2.5 +Flask==3.1.3 Flask-WTF==1.2.1 -gunicorn>=23.0.0 -Werkzeug==3.1.5 -requests==2.32.4 -openai>=1.98.0 +gunicorn==25.0.3 +Werkzeug==3.1.6 +requests==2.33.0 +openai==1.109.1 docx2txt==0.8 -Markdown==3.3.4 +Markdown==3.8.1 bleach==6.1.0 azure-cosmos==4.9.0 msal==1.31.0 @@ -22,17 +22,17 @@ threadpoolctl==3.5.0 azure-search-documents==11.5.3 python-dotenv==0.21.0 azure-ai-formrecognizer==3.3.3 -azure-ai-projects>=1.0.0,<2.0.0 -azure-ai-agents>=1.0.0 -pyjwt>=2.9.0 -markdown2==2.5.3 +azure-ai-projects==1.0.0 +azure-ai-agents==1.2.0b6 +pyjwt==2.12.1 +markdown2==2.5.5 azure-mgmt-cognitiveservices==13.6.0 azure-identity==1.23.0 azure-ai-contentsafety==1.0.0 azure-storage-blob==12.24.1 azure-storage-queue==12.12.0 azure-keyvault-secrets==4.10.0 -pypdf==6.4.0 +pypdf==6.9.2 python-docx==1.1.2 flask-executor==1.0.0 PyMuPDF==1.25.3 @@ -43,16 +43,16 @@ xlrd==2.0.1 pillow==12.1.1 ffmpeg-binaries-compat==1.0.1 ffmpeg-python==0.2.0 -semantic-kernel>=1.39.4 -protobuf>=5,<6 # support for broken SK >=1.40.0 versions, can be removed once SK releases a fixed version -redis>=5.0,<6.0 -pyodbc>=4.0.0 -PyMySQL>=1.0.0 +semantic-kernel==1.39.4 +protobuf==6.33.5 +redis==5.3.1 +pyodbc==5.3.0 +PyMySQL==1.1.2 azure-monitor-opentelemetry==1.6.13 psycopg2-binary==2.9.10 -cython +cython==3.2.4 pyyaml==6.0.2 -aiohttp==3.13.3 +aiohttp==3.13.4 html2text==2025.4.15 matplotlib==3.10.7 azure-cognitiveservices-speech==1.47.0 \ No newline at end of file diff --git a/application/single_app/route_backend_chats.py b/application/single_app/route_backend_chats.py index 8e58ad8b..92ff4dd3 100644 --- a/application/single_app/route_backend_chats.py +++ b/application/single_app/route_backend_chats.py @@ -43,9 +43,67 @@ from swagger_wrapper import swagger_route, get_auth_security from azure.identity import ClientSecretCredential, DefaultAzureCredential, get_bearer_token_provider from functions_keyvault import SecretReturnType, keyvault_model_endpoint_get_helper +from functions_message_artifacts import ( + build_agent_citation_artifact_documents, + filter_assistant_artifact_items, +) from functions_thoughts import ThoughtTracker +def _strip_agent_citation_artifact_refs(agent_citations): + """Drop artifact references when auxiliary payload persistence fails.""" + compact_citations = [] + for citation in agent_citations or []: + if not isinstance(citation, dict): + compact_citations.append(citation) + continue + + compact_citation = dict(citation) + compact_citation.pop('artifact_id', None) + compact_citation.pop('raw_payload_externalized', None) + compact_citations.append(compact_citation) + + return compact_citations + + +def persist_agent_citation_artifacts( + conversation_id, + assistant_message_id, + agent_citations, + created_timestamp, + user_info=None, +): + """Persist raw agent citation payloads outside the primary assistant message doc.""" + if not agent_citations: + return [] + + compact_citations, artifact_docs = build_agent_citation_artifact_documents( + conversation_id=conversation_id, + assistant_message_id=assistant_message_id, + agent_citations=agent_citations, + created_timestamp=created_timestamp, + user_info=user_info, + ) + + try: + for artifact_doc in artifact_docs: + cosmos_messages_container.upsert_item(artifact_doc) + return compact_citations + except Exception as exc: + log_event( + f"[Agent Citations] Failed to persist assistant artifacts: {exc}", + extra={ + 'conversation_id': conversation_id, + 'assistant_message_id': assistant_message_id, + 'artifact_count': len(artifact_docs), + 'citation_count': len(agent_citations), + }, + level=logging.WARNING, + exceptionTraceback=True, + ) + return _strip_agent_citation_artifact_refs(compact_citations) + + def get_tabular_discovery_function_names(): """Return discovery-oriented tabular function names from the plugin.""" from semantic_kernel_plugins.tabular_processing_plugin import TabularProcessingPlugin @@ -230,10 +288,18 @@ def build_search_augmentation_system_prompt(retrieved_content): def build_tabular_computed_results_system_message(source_label, tabular_analysis): """Build the outer-model handoff message for successful tabular analysis.""" + rendered_analysis = str(tabular_analysis or '').strip() + max_handoff_chars = 24000 + if len(rendered_analysis) > max_handoff_chars: + rendered_analysis = ( + rendered_analysis[:max_handoff_chars] + + "\n[Computed results handoff truncated for prompt budget.]" + ) + return ( f"The following tabular results were computed from {source_label} using " f"tabular_processing plugin functions:\n\n" - f"{tabular_analysis}\n\n" + f"{rendered_analysis}\n\n" "These are tool-backed results derived from the full underlying tabular data, not just retrieved schema excerpts. " "Treat them as authoritative for row-level facts, calculations, and numeric conclusions. " "Do not say that you lack direct access to the data if the answer is present in these computed results." @@ -722,7 +788,50 @@ def describe_tabular_invocation_conditions(invocation): return None -def get_tabular_query_overlap_summary(invocations, max_rows=25): +def compact_tabular_fallback_value(value, depth=0, max_depth=2): + """Reduce large tabular fallback values to prompt-safe summaries.""" + if value is None or isinstance(value, (int, float, bool)): + return value + + if isinstance(value, str): + max_string_length = 400 + if len(value) <= max_string_length: + return value + return f"{value[:max_string_length]}... [truncated {len(value) - max_string_length} chars]" + + if depth >= max_depth: + if isinstance(value, dict): + return f"" + if isinstance(value, list): + return f"" + return str(value) + + if isinstance(value, list): + compact_items = [ + compact_tabular_fallback_value(item, depth=depth + 1, max_depth=max_depth) + for item in value[:5] + ] + if len(value) > 5: + compact_items.append({'remaining_items': len(value) - 5}) + return compact_items + + if isinstance(value, dict): + compact_mapping = {} + for index, (key, item) in enumerate(value.items()): + if index >= 12: + compact_mapping['remaining_keys'] = len(value) - 12 + break + compact_mapping[str(key)] = compact_tabular_fallback_value( + item, + depth=depth + 1, + max_depth=max_depth, + ) + return compact_mapping + + return str(value) + + +def get_tabular_query_overlap_summary(invocations, max_rows=10): """Summarize overlap across successful row-returning tabular calls. This is a defensive fallback for cases where tool execution succeeded but the @@ -786,7 +895,7 @@ def get_tabular_query_overlap_summary(invocations, max_rows=25): if row_key not in overlapping_keys or row_key in seen_sample_keys: continue - ordered_sample_rows.append(row) + ordered_sample_rows.append(compact_tabular_fallback_value(row)) seen_sample_keys.add(row_key) if len(ordered_sample_rows) >= max_rows: break @@ -795,7 +904,7 @@ def get_tabular_query_overlap_summary(invocations, max_rows=25): for grouped_item in grouped_items: rendered_conditions = describe_tabular_invocation_conditions(grouped_item['invocation']) if rendered_conditions: - source_queries.append(rendered_conditions) + source_queries.append(compact_tabular_fallback_value(rendered_conditions)) overlap_summary = { 'filename': filename or None, @@ -813,7 +922,7 @@ def get_tabular_query_overlap_summary(invocations, max_rows=25): return best_summary -def get_tabular_invocation_compact_payload(invocation, max_rows=10): +def get_tabular_invocation_compact_payload(invocation, max_rows=5): """Return a compact, prompt-safe summary of a successful tabular invocation.""" result_payload = get_tabular_invocation_result_payload(invocation) if not result_payload: @@ -822,15 +931,15 @@ def get_tabular_invocation_compact_payload(invocation, max_rows=10): function_name = getattr(invocation, 'function_name', '') compact_payload = { 'function': function_name, - 'filename': result_payload.get('filename'), - 'selected_sheet': result_payload.get('selected_sheet'), + 'filename': compact_tabular_fallback_value(result_payload.get('filename')), + 'selected_sheet': compact_tabular_fallback_value(result_payload.get('selected_sheet')), } if function_name == 'aggregate_column': compact_payload.update({ - 'column': result_payload.get('column'), - 'operation': result_payload.get('operation'), - 'result': result_payload.get('result'), + 'column': compact_tabular_fallback_value(result_payload.get('column')), + 'operation': compact_tabular_fallback_value(result_payload.get('operation')), + 'result': compact_tabular_fallback_value(result_payload.get('result')), }) elif function_name in {'group_by_aggregate', 'group_by_datetime_component'}: for key_name in ( @@ -846,7 +955,7 @@ def get_tabular_invocation_compact_payload(invocation, max_rows=10): 'top_results', ): if key_name in result_payload: - compact_payload[key_name] = result_payload.get(key_name) + compact_payload[key_name] = compact_tabular_fallback_value(result_payload.get(key_name)) elif function_name == 'lookup_value': for key_name in ( 'lookup_column', @@ -857,27 +966,39 @@ def get_tabular_invocation_compact_payload(invocation, max_rows=10): 'returned_rows', ): if key_name in result_payload: - compact_payload[key_name] = result_payload.get(key_name) + compact_payload[key_name] = compact_tabular_fallback_value(result_payload.get(key_name)) data_rows = get_tabular_invocation_data_rows(invocation) if data_rows: - compact_payload['sample_rows'] = data_rows[:max_rows] + compact_payload['sample_rows'] = [ + compact_tabular_fallback_value(row) + for row in data_rows[:max_rows] + ] compact_payload['sample_rows_limited'] = len(data_rows) > max_rows elif function_name in {'query_tabular_data', 'filter_rows'}: for key_name in ('total_matches', 'returned_rows'): if key_name in result_payload: - compact_payload[key_name] = result_payload.get(key_name) + compact_payload[key_name] = compact_tabular_fallback_value(result_payload.get(key_name)) data_rows = get_tabular_invocation_data_rows(invocation) if data_rows: - compact_payload['sample_rows'] = data_rows[:max_rows] + compact_payload['sample_rows'] = [ + compact_tabular_fallback_value(row) + for row in data_rows[:max_rows] + ] compact_payload['sample_rows_limited'] = len(data_rows) > max_rows rendered_conditions = describe_tabular_invocation_conditions(invocation) if rendered_conditions: - compact_payload['conditions'] = rendered_conditions + compact_payload['conditions'] = compact_tabular_fallback_value(rendered_conditions) else: - compact_payload.update(result_payload) + compact_payload.update({ + key: compact_tabular_fallback_value(value) + for key, value in result_payload.items() + }) + + if '[truncated ' in json.dumps(compact_payload, default=str): + compact_payload['result_summary_truncated'] = True return compact_payload @@ -895,34 +1016,88 @@ def build_tabular_analysis_fallback_from_invocations(invocations): if not successful_invocations: return None - overlap_summary = get_tabular_query_overlap_summary(successful_invocations) - compact_results = [] - for invocation in successful_invocations[:8]: - compact_payload = get_tabular_invocation_compact_payload(invocation) - if compact_payload is None: - continue - compact_results.append(compact_payload) - - if not overlap_summary and not compact_results: - return None - + max_fallback_chars = 24000 + coverage_note_reserve = 1200 + overlap_summary = get_tabular_query_overlap_summary(successful_invocations, max_rows=10) rendered_sections = [ "The following structured results come directly from successful tabular tool executions.", "Use them as computed evidence even though the inner tabular synthesis step did not complete.", ] if overlap_summary: + if overlap_summary.get('sample_rows') and len(json.dumps(overlap_summary, default=str)) > 6000: + overlap_summary = dict(overlap_summary) + overlap_summary['sample_rows'] = overlap_summary.get('sample_rows', [])[:5] + overlap_summary['sample_rows_limited'] = True + rendered_sections.append( "OVERLAP SUMMARY:\n" f"{json.dumps(overlap_summary, indent=2, default=str)}" ) + base_rendered_text = "\n\n".join(rendered_sections) + compact_results = [] + invocation_limit = 8 + candidate_invocations = successful_invocations[:invocation_limit] + for invocation in candidate_invocations: + compact_payload = get_tabular_invocation_compact_payload(invocation, max_rows=5) + if compact_payload is None: + continue + + candidate_results = compact_results + [compact_payload] + candidate_section = ( + "TOOL RESULT SUMMARIES:\n" + f"{json.dumps(candidate_results, indent=2, default=str)}" + ) + candidate_text = base_rendered_text + ("\n\n" if base_rendered_text else "") + candidate_section + if len(candidate_text) <= (max_fallback_chars - coverage_note_reserve): + compact_results = candidate_results + continue + + if compact_results: + break + + shrunk_payload = dict(compact_payload) + if 'sample_rows' in shrunk_payload: + shrunk_payload['sample_rows'] = shrunk_payload['sample_rows'][:2] + shrunk_payload['sample_rows_limited'] = True + if isinstance(shrunk_payload.get('top_results'), dict): + shrunk_payload['top_results'] = dict(list(shrunk_payload['top_results'].items())[:3]) + + candidate_section = ( + "TOOL RESULT SUMMARIES:\n" + f"{json.dumps([shrunk_payload], indent=2, default=str)}" + ) + candidate_text = base_rendered_text + ("\n\n" if base_rendered_text else "") + candidate_section + if len(candidate_text) > (max_fallback_chars - coverage_note_reserve): + shrunk_payload.pop('sample_rows', None) + shrunk_payload['sample_rows_limited'] = True + shrunk_payload['result_summary_truncated'] = True + if isinstance(shrunk_payload.get('top_results'), dict): + shrunk_payload['top_results'] = dict(list(shrunk_payload['top_results'].items())[:2]) + + compact_results = [shrunk_payload] + break + + if not overlap_summary and not compact_results: + return None + if compact_results: rendered_sections.append( "TOOL RESULT SUMMARIES:\n" f"{json.dumps(compact_results, indent=2, default=str)}" ) + omitted_invocation_count = len(candidate_invocations) - len(compact_results) + if len(successful_invocations) > invocation_limit: + omitted_invocation_count += len(successful_invocations) - invocation_limit + if omitted_invocation_count > 0: + rendered_sections.append( + "RESULT COVERAGE NOTE:\n" + f"Included {len(compact_results)} compact tool summaries out of {len(successful_invocations)} successful tool executions to stay within the prompt budget. " + "Use targeted follow-up tool calls if additional raw detail is required." + ) + return "\n\n".join(rendered_sections) @@ -1190,6 +1365,80 @@ def _tokenize_tabular_sheet_text(text): return tokens +def _extract_tabular_entity_anchor_terms(question_text): + """Extract likely primary-entity terms from an entity lookup question.""" + normalized_question = str(question_text or '').strip().lower() + if not normalized_question: + return [] + + stopwords = { + 'and', + 'any', + 'by', + 'detail', + 'details', + 'exact', + 'explain', + 'find', + 'for', + 'from', + 'full', + 'get', + 'give', + 'lookup', + 'me', + 'of', + 'or', + 'profile', + 'profiles', + 'record', + 'records', + 'related', + 'show', + 'story', + 'summaries', + 'summarize', + 'summary', + 'that', + 'the', + 'their', + 'this', + 'those', + 'these', + 'to', + 'up', + 'with', + } + capture_patterns = ( + r'\bfind\s+([^\.;:!?]+)', + r'\blookup\s+([^\.;:!?]+)', + ) + anchor_terms = [] + seen_anchor_terms = set() + + for capture_pattern in capture_patterns: + match = re.search(capture_pattern, normalized_question) + if not match: + continue + + captured_text = re.split( + r'\b(?:and|show|summarize|summary|profile|with|where|which|who|that)\b', + match.group(1), + maxsplit=1, + )[0] + for token in _tokenize_tabular_sheet_text(captured_text): + if token in stopwords: + continue + if any(character.isdigit() for character in token): + continue + if token in seen_anchor_terms: + continue + seen_anchor_terms.add(token) + anchor_terms.append(token) + + return anchor_terms + + def _score_tabular_sheet_match(sheet_name, question_text, columns=None): """Score how strongly a worksheet name matches the user question. @@ -1228,15 +1477,54 @@ def _score_tabular_sheet_match(sheet_name, question_text, columns=None): return score -def _select_relevant_workbook_sheets(sheet_names, question_text, minimum_score=1, per_sheet=None): +def _score_tabular_entity_sheet_match(sheet_name, question_text, columns=None): + """Score worksheets for entity lookups, prioritizing the primary entity sheet.""" + score = _score_tabular_sheet_match(sheet_name, question_text, columns=columns) + anchor_terms = _extract_tabular_entity_anchor_terms(question_text) + if not anchor_terms: + return score + + question_tokens = set(_tokenize_tabular_sheet_text(question_text)) + sheet_tokens = set(_tokenize_tabular_sheet_text(sheet_name)) + column_tokens = set() + for column_name in columns or []: + column_tokens.update(_tokenize_tabular_sheet_text(column_name)) + + for anchor_term in anchor_terms: + if anchor_term in sheet_tokens: + score += 12 + elif anchor_term in column_tokens: + score += 4 + + if 'profile' in question_tokens and column_tokens.intersection({ + 'address', + 'city', + 'displayname', + 'dob', + 'email', + 'firstname', + 'fullname', + 'lastname', + 'name', + 'phone', + 'state', + 'status', + }): + score += 6 + + return score + + +def _select_relevant_workbook_sheets(sheet_names, question_text, minimum_score=1, per_sheet=None, score_match_fn=None): """Return all workbook sheets that appear relevant to the question.""" + score_match_fn = score_match_fn or _score_tabular_sheet_match ranked_sheets = [] for sheet_name in sheet_names or []: columns = None if per_sheet: sheet_info = per_sheet.get(sheet_name, {}) columns = sheet_info.get('columns', []) - score = _score_tabular_sheet_match(sheet_name, question_text, columns=columns) + score = score_match_fn(sheet_name, question_text, columns=columns) if score < minimum_score: continue ranked_sheets.append((score, sheet_name)) @@ -1323,8 +1611,9 @@ def is_tabular_access_limited_analysis(analysis_text): return any(phrase in normalized_analysis for phrase in inaccessible_phrases) -def _select_likely_workbook_sheet(sheet_names, question_text, per_sheet=None): +def _select_likely_workbook_sheet(sheet_names, question_text, per_sheet=None, score_match_fn=None): """Return a likely sheet name when the user question strongly matches one sheet.""" + score_match_fn = score_match_fn or _score_tabular_sheet_match best_sheet = None best_score = 0 runner_up_score = 0 @@ -1334,7 +1623,7 @@ def _select_likely_workbook_sheet(sheet_names, question_text, per_sheet=None): if per_sheet: sheet_info = per_sheet.get(sheet_name, {}) columns = sheet_info.get('columns', []) - score = _score_tabular_sheet_match(sheet_name, question_text, columns=columns) + score = score_match_fn(sheet_name, question_text, columns=columns) if score > best_score: runner_up_score = best_score @@ -1427,6 +1716,7 @@ async def run_tabular_sk_analysis(user_question, tabular_filenames, user_id, workbook_blob_locations = {} retry_sheet_overrides = {} previous_failed_call_parameters = [] # entity lookup: concrete failed call params for retry hints + sheet_score_match_fn = _score_tabular_entity_sheet_match if entity_lookup_mode else _score_tabular_sheet_match allowed_function_filters = { 'included_functions': [ f"tabular_processing-{function_name}" @@ -1459,11 +1749,13 @@ async def run_tabular_sk_analysis(user_question, tabular_filenames, user_id, schema_info.get('sheet_names', []), user_question, per_sheet=per_sheet, + score_match_fn=sheet_score_match_fn, ) relevant_sheets = _select_relevant_workbook_sheets( schema_info.get('sheet_names', []), user_question, per_sheet=per_sheet, + score_match_fn=sheet_score_match_fn, ) cross_sheet_bridge_plan = None if not schema_summary_mode and not entity_lookup_mode: @@ -1509,6 +1801,8 @@ async def run_tabular_sk_analysis(user_question, tabular_filenames, user_id, 'is_workbook': True, 'sheet_count': schema_info.get('sheet_count', 0), 'likely_sheet': likely_sheet, + 'sheet_role_hints': schema_info.get('sheet_role_hints', {}), + 'relationship_hints': schema_info.get('relationship_hints', [])[:5], 'sheet_directory': sheet_directory, } schema_parts.append(json.dumps(directory_schema, indent=2, default=str)) @@ -1739,19 +2033,24 @@ def build_system_prompt(force_tool_use=False, tool_error_messages=None, executio f"{missing_sheet_feedback}" f"FILE SCHEMAS:\n" f"{schema_context}\n\n" - "AVAILABLE FUNCTIONS: lookup_value, aggregate_column, filter_rows, query_tabular_data, " - "group_by_aggregate, and group_by_datetime_component.\n\n" + "AVAILABLE FUNCTIONS: filter_rows, query_tabular_data, lookup_value, get_distinct_values, count_rows, " + "filter_rows_by_related_values, count_rows_by_related_values, aggregate_column, group_by_aggregate, and group_by_datetime_component.\n\n" "Discovery functions are not available in this analysis run because schema context is already pre-loaded.\n\n" "IMPORTANT:\n" - "1. Pass sheet_name='' on EVERY analytical call for multi-sheet workbooks. Do not rely on a default sheet for cross-sheet entity lookups.\n" - "2. First retrieve the primary entity row on the most relevant worksheet.\n" - "3. Then query other relevant worksheets explicitly to collect related records.\n" - "4. When a retrieved row contains a secondary identifier such as ReturnID, CaseID, AccountID, PaymentID, W2ID, or Form1099ID, reuse it to query dependent worksheets.\n" - "5. Do not stop after the first successful row if the question asks for related records across sheets.\n" - "6. If a requested record type has no corresponding worksheet in the workbook, say that the workbook does not contain that record type.\n" - "7. Clearly distinguish between no matching rows and no corresponding worksheet.\n" - "8. Summarize concrete found records sheet-by-sheet using the tool results, not schema placeholders.\n" - "9. Do not mention hypothetical follow-up analyses, parser errors, or failed attempts unless the user explicitly asked about failures and you have actual tool error output to report." + "1. If the question includes an exact identifier or exact entity name and the correct starting worksheet is unclear, begin with filter_rows or query_tabular_data without sheet_name so the plugin can perform a cross-sheet discovery search.\n" + "2. After the first discovery step, pass sheet_name='' on follow-up analytical calls for multi-sheet workbooks. Do not rely on a default sheet for cross-sheet entity lookups.\n" + "3. Use filter_rows or query_tabular_data first when you need full matching rows. Use lookup_value only when you already know the exact worksheet and target column.\n" + "4. Do not start with aggregate_column, group_by_aggregate, or group_by_datetime_component until you have located the relevant entity rows.\n" + "5. When using query_tabular_data, use simple DataFrame.query() syntax with backticked column names for columns containing spaces. Avoid method calls such as .str.lower() or .astype(...).\n" + "6. Then query other relevant worksheets explicitly to collect related records.\n" + "7. When a retrieved row contains a secondary identifier such as ReturnID, CaseID, AccountID, PaymentID, W2ID, or Form1099ID, reuse it to query dependent worksheets.\n" + "8. Do not stop after the first successful row if the question asks for related records across sheets.\n" + "9. If a requested record type has no corresponding worksheet in the workbook, say that the workbook does not contain that record type.\n" + "10. Clearly distinguish between no matching rows and no corresponding worksheet.\n" + "11. Summarize concrete found records sheet-by-sheet using the tool results, not schema placeholders.\n" + "12. For count or percentage questions involving a cohort defined on one sheet and facts on another, prefer get_distinct_values, count_rows, filter_rows_by_related_values, or count_rows_by_related_values over manually counting sampled rows.\n" + "13. Use normalize_match=true when matching names, owners, assignees, engineers, or similar entity-text columns across worksheets.\n" + "14. Do not mention hypothetical follow-up analyses, parser errors, or failed attempts unless the user explicitly asked about failures and you have actual tool error output to report." ) return ( @@ -1770,8 +2069,8 @@ def build_system_prompt(force_tool_use=False, tool_error_messages=None, executio f"{missing_sheet_feedback}" f"FILE SCHEMAS:\n" f"{schema_context}\n\n" - "AVAILABLE FUNCTIONS: lookup_value, aggregate_column, filter_rows, query_tabular_data, " - "group_by_aggregate, and group_by_datetime_component for year/quarter/month/week/day/hour trend analysis.\n\n" + "AVAILABLE FUNCTIONS: lookup_value, get_distinct_values, count_rows, filter_rows, query_tabular_data, " + "filter_rows_by_related_values, count_rows_by_related_values, aggregate_column, group_by_aggregate, and group_by_datetime_component for year/quarter/month/week/day/hour trend analysis.\n\n" "Discovery functions are not available in this analysis run because schema context is already pre-loaded.\n\n" "IMPORTANT:\n" "1. Use the pre-loaded schema to pick the correct columns, then call the plugin functions.\n" @@ -1779,18 +2078,23 @@ def build_system_prompt(force_tool_use=False, tool_error_messages=None, executio "3. If a previous tool error says a requested column is missing on the current sheet and suggests candidate sheets, switch to one of those candidate sheets immediately.\n" "4. For account/category lookup questions at a specific period or metric, use lookup_value first. Provide lookup_column, lookup_value, and target_column.\n" "5. If lookup_value is not sufficient, use filter_rows or query_tabular_data on the label column, then read the requested period column.\n" - "6. Only use aggregate_column when the user explicitly asks for a sum, average, min, max, or count across rows.\n" - "7. For time-based questions on datetime columns, use group_by_datetime_component.\n" - "8. For threshold, ranking, comparison, or correlation-like questions, first filter/query the relevant rows, then compute grouped metrics.\n" - "9. When the question asks for grouped results for each entity or category and a cross-sheet bridge plan is available, use the reference worksheet to identify the canonical entities or categories and the fact worksheet to compute the metric. Do not answer 'each X' by grouping a yes/no, boolean, or membership-flag column unless the user explicitly asked about that flag.\n" - "10. When the question asks for rows satisfying multiple conditions, prefer one combined query_expression using and/or instead of separate broad queries that you plan to intersect later.\n" - "11. Batch multiple independent function calls in a SINGLE response whenever possible.\n" - "12. Keep max_rows as small as possible. Only increase it when the user explicitly asked for an exhaustive row list or export; otherwise return total_matches plus representative rows.\n" - "13. For analytical questions, prefer lookup/filter/query plus aggregate/grouped computations over raw row or preview output.\n" - "14. For identifier-based workbook questions, locate the identifier on the correct sheet before explaining downstream calculations.\n" - "15. For peak, busiest, highest, or lowest questions, use grouped functions and inspect the highest_group, highest_value, lowest_group, and lowest_value summary fields.\n" - "16. Return only computed findings and name the strongest drivers clearly.\n" - "17. Do not mention hypothetical follow-up analyses, parser errors, or failed attempts unless the user explicitly asked about failures and you have actual tool error output to report." + "6. For deterministic how-many questions, use count_rows instead of estimating counts from partial returned rows.\n" + "7. For cohort, membership, ownership-share, or percentage questions where one sheet defines the group and another sheet contains the fact rows, use get_distinct_values, filter_rows_by_related_values, or count_rows_by_related_values.\n" + "8. When the question asks for one named member's share within that cohort, prefer count_rows_by_related_values and either read source_value_match_counts from the helper result or rerun count_rows_by_related_values with source_filter_column/source_filter_value on the reference sheet. Do not fall back to query_tabular_data or filter_rows on the fact sheet with a guessed exact text value unless the workbook already exposed that canonical target value.\n" + "9. Use normalize_match=true when matching names, owners, assignees, engineers, or similar entity-text columns across worksheets.\n" + "10. Only use aggregate_column when the user explicitly asks for a sum, average, min, max, or count across rows and count_rows is not the simpler deterministic option.\n" + "11. For time-based questions on datetime columns, use group_by_datetime_component.\n" + "12. For threshold, ranking, comparison, or correlation-like questions, first filter/query the relevant rows, then compute grouped metrics.\n" + "13. When the question asks for grouped results for each entity or category and a cross-sheet bridge plan or relationship hint is available, use the reference worksheet to identify the canonical entities or categories and the fact worksheet to compute the metric. Do not answer 'each X' by grouping a yes/no, boolean, or membership-flag column unless the user explicitly asked about that flag.\n" + "14. When the question asks for rows satisfying multiple conditions, prefer one combined query_expression using and/or instead of separate broad queries that you plan to intersect later.\n" + "15. Batch multiple independent function calls in a SINGLE response whenever possible.\n" + "16. Keep max_rows as small as possible. Only increase it when the user explicitly asked for an exhaustive row list or export; otherwise return total_matches plus representative rows.\n" + "17. For analytical questions, prefer deterministic counts plus lookup/filter/query/grouped computations over raw row or preview output.\n" + "18. For identifier-based workbook questions, locate the identifier on the correct sheet before explaining downstream calculations.\n" + "19. For peak, busiest, highest, or lowest questions, use grouped functions and inspect the highest_group, highest_value, lowest_group, and lowest_value summary fields.\n" + "20. Return only computed findings and name the strongest drivers clearly.\n" + "21. Do not mention hypothetical follow-up analyses, parser errors, or failed attempts unless the user explicitly asked about failures and you have actual tool error output to report.\n" + "22. When using query_tabular_data, use simple DataFrame.query() syntax with backticked column names for columns containing spaces. Avoid method calls such as .str.lower(), .astype(...), or other Python expressions that DataFrame.query() may reject." ) baseline_invocations = plugin_logger.get_invocations_for_conversation( @@ -4301,6 +4605,7 @@ def result_requires_message_reload(result: Any) -> bool: all_messages = list(cosmos_messages_container.query_items( query=all_messages_query, parameters=params_all, partition_key=conversation_id, enable_cross_partition_query=True )) + all_messages = filter_assistant_artifact_items(all_messages) # Sort messages using threading logic all_messages = sort_messages_by_thread(all_messages) @@ -5468,18 +5773,26 @@ def gpt_error(e): # Assistant message should be part of the same thread as the user message # Only system/augmentation messages create new threads within a conversation + assistant_timestamp = datetime.utcnow().isoformat() + prepared_agent_citations = persist_agent_citation_artifacts( + conversation_id=conversation_id, + assistant_message_id=assistant_message_id, + agent_citations=agent_citations_list, + created_timestamp=assistant_timestamp, + user_info=user_info_for_assistant, + ) + assistant_doc = { 'id': assistant_message_id, 'conversation_id': conversation_id, 'role': 'assistant', 'content': ai_message, - 'timestamp': datetime.utcnow().isoformat(), + 'timestamp': assistant_timestamp, 'augmented': bool(system_messages_for_augmentation), 'hybrid_citations': hybrid_citations_list, # <--- SIMPLIFIED: Directly use the list 'web_search_citations': web_search_citations_list, 'hybridsearch_query': search_query if hybrid_search_enabled and search_results else None, # Log query only if hybrid search ran and found results - 'agent_citations': agent_citations_list, # <--- NEW: Store agent tool invocation results - 'user_message': user_message, + 'agent_citations': prepared_agent_citations, 'model_deployment_name': actual_model_used, 'agent_display_name': agent_display_name, 'agent_name': agent_name, @@ -5616,7 +5929,7 @@ def gpt_error(e): 'augmented': bool(system_messages_for_augmentation), 'hybrid_citations': hybrid_citations_list, 'web_search_citations': web_search_citations_list, - 'agent_citations': agent_citations_list, + 'agent_citations': prepared_agent_citations, 'reload_messages': reload_messages_required, 'kernel_fallback_notice': kernel_fallback_notice, 'thoughts_enabled': thought_tracker.enabled @@ -6755,6 +7068,7 @@ def publish_live_plugin_thought(thought_payload): query=all_messages_query, parameters=params_all, partition_key=conversation_id, enable_cross_partition_query=True )) + all_messages = filter_assistant_artifact_items(all_messages) # Sort messages using threading logic all_messages = sort_messages_by_thread(all_messages) @@ -7360,28 +7674,37 @@ def make_json_serializable(obj): # Get user thread info to maintain thread consistency user_thread_id = None user_previous_thread_id = None + user_info_for_assistant = None try: user_msg = cosmos_messages_container.read_item( item=user_message_id, partition_key=conversation_id ) + user_info_for_assistant = user_msg.get('metadata', {}).get('user_info') user_thread_id = user_msg.get('metadata', {}).get('thread_info', {}).get('thread_id') user_previous_thread_id = user_msg.get('metadata', {}).get('thread_info', {}).get('previous_thread_id') except Exception as e: debug_print(f"Warning: Could not retrieve thread_id from user message: {e}") - + assistant_timestamp = datetime.utcnow().isoformat() + prepared_agent_citations = persist_agent_citation_artifacts( + conversation_id=conversation_id, + assistant_message_id=assistant_message_id, + agent_citations=agent_citations_list, + created_timestamp=assistant_timestamp, + user_info=user_info_for_assistant, + ) + assistant_doc = { 'id': assistant_message_id, 'conversation_id': conversation_id, 'role': 'assistant', 'content': accumulated_content, - 'timestamp': datetime.utcnow().isoformat(), + 'timestamp': assistant_timestamp, 'augmented': bool(system_messages_for_augmentation), 'hybrid_citations': hybrid_citations_list, 'web_search_citations': web_search_citations_list, 'hybridsearch_query': search_query if hybrid_search_enabled and search_results else None, - 'agent_citations': agent_citations_list, - 'user_message': user_message, + 'agent_citations': prepared_agent_citations, 'model_deployment_name': final_model_used if use_agent_streaming else gpt_model, 'agent_display_name': agent_display_name_used if use_agent_streaming else None, 'agent_name': agent_name_used if use_agent_streaming else None, @@ -7500,7 +7823,7 @@ def make_json_serializable(obj): 'augmented': bool(system_messages_for_augmentation), 'hybrid_citations': hybrid_citations_list, 'web_search_citations': web_search_citations_list, - 'agent_citations': agent_citations_list, + 'agent_citations': prepared_agent_citations, 'agent_display_name': agent_display_name_used if use_agent_streaming else None, 'agent_name': agent_name_used if use_agent_streaming else None, 'full_content': accumulated_content, @@ -7522,19 +7845,26 @@ def make_json_serializable(obj): # Save partial response if we have content if accumulated_content: current_assistant_thread_id = str(uuid.uuid4()) + assistant_timestamp = datetime.utcnow().isoformat() + prepared_agent_citations = persist_agent_citation_artifacts( + conversation_id=conversation_id, + assistant_message_id=assistant_message_id, + agent_citations=agent_citations_list, + created_timestamp=assistant_timestamp, + user_info=user_info_for_assistant, + ) assistant_doc = { 'id': assistant_message_id, 'conversation_id': conversation_id, 'role': 'assistant', 'content': accumulated_content, - 'timestamp': datetime.utcnow().isoformat(), + 'timestamp': assistant_timestamp, 'augmented': bool(system_messages_for_augmentation), 'hybrid_citations': hybrid_citations_list, 'web_search_citations': web_search_citations_list, 'hybridsearch_query': search_query if hybrid_search_enabled and search_results else None, - 'agent_citations': agent_citations_list, - 'user_message': user_message, + 'agent_citations': prepared_agent_citations, 'model_deployment_name': final_model_used if use_agent_streaming else gpt_model, 'agent_display_name': agent_display_name_used if use_agent_streaming else None, 'agent_name': agent_name_used if use_agent_streaming else None, diff --git a/application/single_app/route_backend_conversation_export.py b/application/single_app/route_backend_conversation_export.py index 689d3476..b070c222 100644 --- a/application/single_app/route_backend_conversation_export.py +++ b/application/single_app/route_backend_conversation_export.py @@ -18,6 +18,11 @@ from functions_chat import sort_messages_by_thread from functions_conversation_metadata import update_conversation_with_metadata from functions_debug import debug_print +from functions_message_artifacts import ( + build_message_artifact_payload_map, + hydrate_agent_citations_from_artifacts, + is_assistant_artifact_role, +) from functions_settings import * from functions_thoughts import get_thoughts_for_conversation from swagger_wrapper import swagger_route, get_auth_security @@ -188,6 +193,20 @@ def api_export_message_word(): if message.get('conversation_id') != conversation_id: return jsonify({'error': 'Message not found'}), 404 + if isinstance(message.get('agent_citations'), list) and any( + isinstance(citation, dict) and citation.get('artifact_id') + for citation in message.get('agent_citations', []) + ): + conversation_messages = list(cosmos_messages_container.query_items( + query="SELECT * FROM c WHERE c.conversation_id = @conversation_id", + parameters=[{'name': '@conversation_id', 'value': conversation_id}], + partition_key=conversation_id, + )) + artifact_payload_map = build_message_artifact_payload_map(conversation_messages) + hydrated_messages = hydrate_agent_citations_from_artifacts([message], artifact_payload_map) + if hydrated_messages: + message = hydrated_messages[0] + document_bytes = _message_to_docx_bytes(message) timestamp_str = datetime.utcnow().strftime('%Y%m%d_%H%M%S') filename = f"message_export_{timestamp_str}.docx" @@ -213,7 +232,9 @@ def _build_export_entry( include_summary_intro: bool = False, summary_model_deployment: str = '' ) -> Dict[str, Any]: + artifact_payload_map = build_message_artifact_payload_map(raw_messages) filtered_messages = _filter_messages_for_export(raw_messages) + filtered_messages = hydrate_agent_citations_from_artifacts(filtered_messages, artifact_payload_map) ordered_messages = sort_messages_by_thread(filtered_messages) raw_thoughts = get_thoughts_for_conversation(conversation.get('id'), user_id) @@ -285,6 +306,9 @@ def _build_export_entry( def _filter_messages_for_export(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: filtered_messages = [] for message in messages: + if is_assistant_artifact_role(message.get('role')): + continue + metadata = message.get('metadata', {}) or {} if metadata.get('is_deleted') is True: continue diff --git a/application/single_app/route_backend_conversations.py b/application/single_app/route_backend_conversations.py index 8473179f..23f1a071 100644 --- a/application/single_app/route_backend_conversations.py +++ b/application/single_app/route_backend_conversations.py @@ -8,6 +8,7 @@ from functions_notifications import mark_chat_response_notifications_read_for_conversation from flask import Response, request from functions_debug import debug_print +from functions_message_artifacts import filter_assistant_artifact_items from swagger_wrapper import swagger_route, get_auth_security from functions_activity_logging import log_conversation_creation, log_conversation_deletion, log_conversation_archival from functions_thoughts import archive_thoughts_for_conversation, delete_thoughts_for_conversation @@ -37,6 +38,40 @@ def normalize_chat_type(conversation_item): conversation_item['chat_type'] = chat_type return chat_type, True + +def _collect_child_message_documents(conversation_id, root_message_ids): + """Collect child records linked by parent_message_id for the provided message ids.""" + pending_ids = [message_id for message_id in root_message_ids if message_id] + seen_ids = set(pending_ids) + child_docs = [] + + while pending_ids: + parent_message_id = pending_ids.pop(0) + child_query = ( + "SELECT * FROM c " + "WHERE c.conversation_id = @conversation_id " + "AND c.parent_message_id = @parent_message_id" + ) + child_results = list(cosmos_messages_container.query_items( + query=child_query, + parameters=[ + {'name': '@conversation_id', 'value': conversation_id}, + {'name': '@parent_message_id', 'value': parent_message_id}, + ], + partition_key=conversation_id, + )) + + for child_doc in child_results: + child_id = child_doc.get('id') + if not child_id or child_id in seen_ids: + continue + + seen_ids.add(child_id) + child_docs.append(child_doc) + pending_ids.append(child_id) + + return child_docs + def register_route_backend_conversations(app): @app.route('/api/get_messages', methods=['GET']) @@ -69,6 +104,7 @@ def api_get_messages(): query=message_query, partition_key=conversation_id )) + all_items = filter_assistant_artifact_items(all_items) debug_print(f"Query returned {len(all_items)} total items (before filtering)") @@ -949,6 +985,7 @@ def generate_conversation_summary_api(conversation_id): parameters=params, enable_cross_partition_query=True )) + raw_messages = filter_assistant_artifact_items(raw_messages) except Exception as e: debug_print(f"Error querying messages for summary: {e}") return jsonify({'error': 'Failed to query messages'}), 500 @@ -1500,6 +1537,13 @@ def delete_message(message_id): else: # Delete only the specified message messages_to_delete = [message_doc] + + child_message_docs = _collect_child_message_documents( + conversation_id, + [message.get('id') for message in messages_to_delete], + ) + if child_message_docs: + messages_to_delete.extend(child_message_docs) # THREAD ATTEMPT PROMOTION: If deleting an active thread attempt, promote next attempt if messages_to_delete: diff --git a/application/single_app/route_backend_plugins.py b/application/single_app/route_backend_plugins.py index f958b10e..115bf282 100644 --- a/application/single_app/route_backend_plugins.py +++ b/application/single_app/route_backend_plugins.py @@ -37,7 +37,7 @@ #from functions_personal_actions import delete_personal_action from functions_debug import debug_print -from json_schema_validation import validate_plugin +from json_schema_validation import PLUGIN_STORAGE_MANAGED_FIELDS, validate_plugin from functions_activity_logging import ( log_action_creation, log_action_update, @@ -336,61 +336,63 @@ def set_user_plugins(): for plugin in plugins: if plugin.get('name', '').lower() in global_plugin_names: continue # Skip global plugins + plugin_to_save = dict(plugin) # Remove is_global if present - if 'is_global' in plugin: - del plugin['is_global'] + if 'is_global' in plugin_to_save: + del plugin_to_save['is_global'] # Ensure required fields have default values - plugin.setdefault('name', '') - plugin.setdefault('displayName', plugin.get('name', '')) - plugin.setdefault('description', '') - plugin.setdefault('metadata', {}) - plugin.setdefault('additionalFields', {}) + plugin_to_save.setdefault('name', '') + plugin_to_save.setdefault('displayName', plugin_to_save.get('name', '')) + plugin_to_save.setdefault('description', '') + plugin_to_save.setdefault('metadata', {}) + plugin_to_save.setdefault('additionalFields', {}) - # Remove Cosmos DB system fields that are not part of the plugin schema - cosmos_fields = ['_attachments', '_etag', '_rid', '_self', '_ts', 'created_at', 'updated_at', 'user_id', 'last_updated'] - for field in cosmos_fields: - if field in plugin: - del plugin[field] + # Remove storage-managed fields that are not part of the plugin manifest schema, + # but preserve the action ID so existing records can be updated in place. + for field in PLUGIN_STORAGE_MANAGED_FIELDS: + if field == 'id': + continue + plugin_to_save.pop(field, None) # Handle endpoint based on plugin type - plugin_type = plugin.get('type', '') + plugin_type = plugin_to_save.get('type', '') if plugin_type in ['sql_schema', 'sql_query']: # SQL plugins don't use endpoints, but schema validation requires one # Use a placeholder that indicates it's a SQL plugin - plugin.setdefault('endpoint', f'sql://{plugin_type}') + plugin_to_save.setdefault('endpoint', f'sql://{plugin_type}') elif plugin_type == 'msgraph': # MS Graph plugin does not require an endpoint, but schema validation requires one #TODO: Update to support different clouds - plugin.setdefault('endpoint', 'https://graph.microsoft.com') + plugin_to_save.setdefault('endpoint', 'https://graph.microsoft.com') else: # For other plugin types, require a real endpoint - plugin.setdefault('endpoint', '') + plugin_to_save.setdefault('endpoint', '') # Ensure auth has default structure - if 'auth' not in plugin: - plugin['auth'] = {'type': 'identity'} - elif not isinstance(plugin['auth'], dict): - plugin['auth'] = {'type': 'identity'} - elif 'type' not in plugin['auth']: - plugin['auth']['type'] = 'identity' + if 'auth' not in plugin_to_save: + plugin_to_save['auth'] = {'type': 'identity'} + elif not isinstance(plugin_to_save['auth'], dict): + plugin_to_save['auth'] = {'type': 'identity'} + elif 'type' not in plugin_to_save['auth']: + plugin_to_save['auth']['type'] = 'identity' # Auto-fill type from metadata if missing or empty - if not plugin.get('type'): - if plugin.get('metadata', {}).get('type'): - plugin['type'] = plugin['metadata']['type'] + if not plugin_to_save.get('type'): + if plugin_to_save.get('metadata', {}).get('type'): + plugin_to_save['type'] = plugin_to_save['metadata']['type'] else: - plugin['type'] = 'unknown' # Default type + plugin_to_save['type'] = 'unknown' # Default type - debug_print(f"Plugin build: {_redact_plugin_for_logging(plugin)}") - validation_error = validate_plugin(plugin) + debug_print(f"Plugin build: {_redact_plugin_for_logging(plugin_to_save)}") + validation_error = validate_plugin(plugin_to_save) if validation_error: return jsonify({'error': f'Plugin validation failed: {validation_error}'}), 400 - filtered_plugins.append(plugin) - new_plugin_names.add(plugin['name']) - if plugin.get('id'): - new_plugin_ids.add(plugin['id']) + filtered_plugins.append(plugin_to_save) + new_plugin_names.add(plugin_to_save['name']) + if plugin_to_save.get('id'): + new_plugin_ids.add(plugin_to_save['id']) # Save each plugin to the personal_actions container plugins_to_delete = [] diff --git a/application/single_app/route_frontend_conversations.py b/application/single_app/route_frontend_conversations.py index a5d3f261..4ffc2371 100644 --- a/application/single_app/route_frontend_conversations.py +++ b/application/single_app/route_frontend_conversations.py @@ -4,6 +4,7 @@ from functions_authentication import * from functions_debug import debug_print from functions_chat import sort_messages_by_thread +from functions_message_artifacts import filter_assistant_artifact_items from swagger_wrapper import swagger_route, get_auth_security def register_route_frontend_conversations(app): @@ -53,6 +54,7 @@ def view_conversation(conversation_id): query=message_query, partition_key=conversation_id )) + messages = filter_assistant_artifact_items(messages) return render_template('chat.html', conversation_id=conversation_id, messages=messages) @app.route('/conversation//messages', methods=['GET']) @@ -78,6 +80,7 @@ def get_conversation_messages(conversation_id): query=msg_query, partition_key=conversation_id )) + all_items = filter_assistant_artifact_items(all_items) debug_print(f"Frontend endpoint - Query returned {len(all_items)} total items (before filtering)") diff --git a/application/single_app/semantic_kernel_plugins/tabular_processing_plugin.py b/application/single_app/semantic_kernel_plugins/tabular_processing_plugin.py index fd5e2597..cf7fc663 100644 --- a/application/single_app/semantic_kernel_plugins/tabular_processing_plugin.py +++ b/application/single_app/semantic_kernel_plugins/tabular_processing_plugin.py @@ -12,9 +12,10 @@ import io import json import logging +import re import warnings import pandas -from typing import Annotated, Optional, List +from typing import Annotated, Dict, List, Optional, Set from semantic_kernel.functions import kernel_function from semantic_kernel_plugins.plugin_invocation_logger import plugin_function_logger from functions_appinsights import log_event @@ -38,9 +39,13 @@ class TabularProcessingPlugin: ) ANALYSIS_FUNCTION_NAMES = ( 'lookup_value', + 'get_distinct_values', + 'count_rows', 'aggregate_column', 'filter_rows', 'query_tabular_data', + 'filter_rows_by_related_values', + 'count_rows_by_related_values', 'group_by_aggregate', 'group_by_datetime_component', ) @@ -73,6 +78,32 @@ class TabularProcessingPlugin: 'November', 'December' ] + RELATIONSHIP_COLUMN_HINT_TOKENS = { + 'account', + 'alias', + 'assignee', + 'category', + 'customer', + 'email', + 'employee', + 'engineer', + 'group', + 'id', + 'manager', + 'member', + 'name', + 'owner', + 'person', + 'resource', + 'se', + 'solution', + 'team', + 'user', + } + RELATIONSHIP_HINT_LIMIT = 10 + RELATIONSHIP_VALUE_SAMPLE_LIMIT = 500 + RELATIONSHIP_SHARED_VALUE_LIMIT = 5 + SOURCE_VALUE_MATCH_COUNT_LIMIT = 100 def __init__(self): self._df_cache = {} # Per-instance cache: (container, blob_name, sheet_name) -> DataFrame @@ -239,6 +270,7 @@ def _filter_rows_across_sheets( column: str, operator_str: str, value: str, + normalize_match: bool = False, max_rows: int = 100, ) -> Optional[str]: """Search for matching rows across all sheets that contain the requested column. @@ -271,40 +303,14 @@ def _filter_rows_across_sheets( continue sheets_searched.append(sheet) - series = df[column] - op = operator_str.strip().lower() - - numeric_value = None try: - numeric_value = float(value) - except (ValueError, TypeError): - pass - - if op in ('==', 'equals'): - if numeric_value is not None and pandas.api.types.is_numeric_dtype(series): - mask = series == numeric_value - else: - mask = series.astype(str).str.lower() == value.lower() - elif op == '!=': - if numeric_value is not None and pandas.api.types.is_numeric_dtype(series): - mask = series != numeric_value - else: - mask = series.astype(str).str.lower() != value.lower() - elif op == '>': - mask = series > numeric_value if numeric_value is not None else pandas.Series([False] * len(series)) - elif op == '<': - mask = series < numeric_value if numeric_value is not None else pandas.Series([False] * len(series)) - elif op == '>=': - mask = series >= numeric_value if numeric_value is not None else pandas.Series([False] * len(series)) - elif op == '<=': - mask = series <= numeric_value if numeric_value is not None else pandas.Series([False] * len(series)) - elif op == 'contains': - mask = series.astype(str).str.contains(value, case=False, na=False) - elif op == 'startswith': - mask = series.astype(str).str.lower().str.startswith(value.lower()) - elif op == 'endswith': - mask = series.astype(str).str.lower().str.endswith(value.lower()) - else: + mask = self._build_series_match_mask( + df[column], + operator_str, + value, + normalize_match=normalize_match, + ) + except ValueError: continue sheet_matches = int(mask.sum()) @@ -350,6 +356,7 @@ def _lookup_value_across_sheets( lookup_value_str: str, target_column: Optional[str] = None, match_operator: str = "equals", + normalize_match: bool = False, max_rows: int = 25, ) -> Optional[str]: """Look up matching rows across all sheets that contain the lookup column. @@ -384,18 +391,20 @@ def _lookup_value_across_sheets( continue sheets_searched.append(sheet) - series = df[lookup_column] - - if operator in {'equals', '=='}: - mask = series.astype(str).str.lower() == normalized_lookup_value.lower() - elif operator == 'contains': - mask = series.astype(str).str.contains(normalized_lookup_value, case=False, na=False) - elif operator == 'startswith': - mask = series.astype(str).str.lower().str.startswith(normalized_lookup_value.lower()) - elif operator == 'endswith': - mask = series.astype(str).str.lower().str.endswith(normalized_lookup_value.lower()) - else: - mask = series.astype(str).str.lower() == normalized_lookup_value.lower() + try: + mask = self._build_series_match_mask( + df[lookup_column], + operator, + normalized_lookup_value, + normalize_match=normalize_match, + ) + except ValueError: + mask = self._build_series_match_mask( + df[lookup_column], + 'equals', + normalized_lookup_value, + normalize_match=normalize_match, + ) sheet_matches = int(mask.sum()) if sheet_matches == 0: @@ -465,6 +474,7 @@ def _query_tabular_data_across_sheets( sheets_searched = [] sheets_matched = [] total_matches = 0 + query_errors = [] for sheet in available_sheets: df = self._read_tabular_blob_to_dataframe( @@ -476,8 +486,11 @@ def _query_tabular_data_across_sheets( try: result_df = df.query(query_expression) - except Exception: - # Query expression references columns not in this sheet โ€” skip + except Exception as query_error: + query_errors.append({ + 'sheet_name': sheet, + 'error': str(query_error), + }) continue sheets_searched.append(sheet) @@ -494,6 +507,29 @@ def _query_tabular_data_across_sheets( combined_results.append(row) if not sheets_searched: + if query_errors: + unique_errors = [] + seen_errors = set() + for query_error in query_errors: + normalized_error = str(query_error.get('error') or '').strip() + if not normalized_error or normalized_error in seen_errors: + continue + seen_errors.add(normalized_error) + unique_errors.append(normalized_error) + + return json.dumps({ + "error": ( + "Query error: the expression could not be evaluated on any worksheet during cross-sheet search. " + "Use simple DataFrame.query() syntax with existing column names, or provide sheet_name to target a specific worksheet." + ), + "filename": filename, + "selected_sheet": "ALL (cross-sheet search)", + "query_expression": query_expression, + "sheets_evaluated": [ + query_error['sheet_name'] for query_error in query_errors + ], + "details": unique_errors[:3], + }, indent=2, default=str) return None log_event( @@ -514,6 +550,487 @@ def _query_tabular_data_across_sheets( "data": combined_results, }, indent=2, default=str) + def _count_rows_across_sheets( + self, + container_name: str, + blob_name: str, + filename: str, + filter_column: Optional[str] = None, + filter_operator: str = 'equals', + filter_value=None, + query_expression: Optional[str] = None, + normalize_match: bool = False, + ) -> Optional[str]: + """Count rows across all worksheets that satisfy optional filters or queries.""" + workbook_metadata = self._get_workbook_metadata(container_name, blob_name) + if not workbook_metadata.get('is_workbook'): + return None + + available_sheets = workbook_metadata.get('sheet_names', []) + if len(available_sheets) <= 1: + return None + + sheets_searched = [] + sheets_matched = [] + total_count = 0 + query_errors = [] + applied_filters = [] + + for sheet in available_sheets: + df = self._read_tabular_blob_to_dataframe( + container_name, + blob_name, + sheet_name=sheet, + ) + df = self._try_numeric_conversion(df) + + try: + filtered_df, sheet_filters = self._apply_optional_dataframe_filters( + df, + query_expression=query_expression, + filter_column=filter_column, + filter_operator=filter_operator, + filter_value=filter_value, + normalize_match=normalize_match, + ) + except KeyError: + continue + except Exception as query_error: + if query_expression: + query_errors.append({ + 'sheet_name': sheet, + 'error': str(query_error), + }) + continue + + sheets_searched.append(sheet) + applied_filters = sheet_filters or applied_filters + sheet_count = len(filtered_df) + total_count += sheet_count + if sheet_count > 0: + sheets_matched.append(sheet) + + if not sheets_searched: + if query_errors: + unique_errors = [] + seen_errors = set() + for query_error in query_errors: + normalized_error = str(query_error.get('error') or '').strip() + if not normalized_error or normalized_error in seen_errors: + continue + seen_errors.add(normalized_error) + unique_errors.append(normalized_error) + + return json.dumps({ + 'error': ( + 'Query error: the expression could not be evaluated on any worksheet during cross-sheet row counting. ' + 'Use simple DataFrame.query() syntax with existing column names, or provide sheet_name to target a specific worksheet.' + ), + 'filename': filename, + 'selected_sheet': 'ALL (cross-sheet search)', + 'query_expression': query_expression, + 'details': unique_errors[:3], + }, indent=2, default=str) + return None + + return json.dumps({ + 'filename': filename, + 'selected_sheet': 'ALL (cross-sheet search)', + 'sheets_searched': sheets_searched, + 'sheets_matched': sheets_matched, + 'filter_applied': applied_filters, + 'row_count': total_count, + 'normalize_match': normalize_match, + }, indent=2, default=str) + + def _get_distinct_values_across_sheets( + self, + container_name: str, + blob_name: str, + filename: str, + column: str, + query_expression: Optional[str] = None, + filter_column: Optional[str] = None, + filter_operator: str = 'equals', + filter_value=None, + normalize_match: bool = False, + max_values: int = 100, + ) -> Optional[str]: + """Return distinct values for a column across all worksheets that contain it.""" + workbook_metadata = self._get_workbook_metadata(container_name, blob_name) + if not workbook_metadata.get('is_workbook'): + return None + + available_sheets = workbook_metadata.get('sheet_names', []) + if len(available_sheets) <= 1: + return None + + sheets_searched = [] + sheets_matched = [] + distinct_display_values = {} + query_errors = [] + applied_filters = [] + + for sheet in available_sheets: + df = self._read_tabular_blob_to_dataframe( + container_name, + blob_name, + sheet_name=sheet, + ) + if column not in df.columns: + continue + + try: + filtered_df, sheet_filters = self._apply_optional_dataframe_filters( + df, + query_expression=query_expression, + filter_column=filter_column, + filter_operator=filter_operator, + filter_value=filter_value, + normalize_match=normalize_match, + ) + except KeyError: + continue + except Exception as query_error: + if query_expression: + query_errors.append({ + 'sheet_name': sheet, + 'error': str(query_error), + }) + continue + + sheets_searched.append(sheet) + applied_filters = sheet_filters or applied_filters + for cell_value in filtered_df[column].tolist(): + display_value = str(cell_value).strip() + if not display_value: + continue + compare_variants = self._extract_cell_value_variants( + cell_value, + normalize_match=normalize_match, + ) + if not compare_variants: + continue + canonical_key = sorted(compare_variants)[0] + distinct_display_values.setdefault(canonical_key, display_value) + + if not filtered_df.empty: + sheets_matched.append(sheet) + + if not sheets_searched: + if query_errors: + unique_errors = [] + seen_errors = set() + for query_error in query_errors: + normalized_error = str(query_error.get('error') or '').strip() + if not normalized_error or normalized_error in seen_errors: + continue + seen_errors.add(normalized_error) + unique_errors.append(normalized_error) + + return json.dumps({ + 'error': ( + 'Query error: the expression could not be evaluated on any worksheet during cross-sheet distinct-value discovery. ' + 'Use simple DataFrame.query() syntax with existing column names, or provide sheet_name to target a specific worksheet.' + ), + 'filename': filename, + 'selected_sheet': 'ALL (cross-sheet search)', + 'query_expression': query_expression, + 'details': unique_errors[:3], + }, indent=2, default=str) + return None + + ordered_values = sorted(distinct_display_values.values(), key=lambda item: item.casefold()) + return json.dumps({ + 'filename': filename, + 'selected_sheet': 'ALL (cross-sheet search)', + 'column': column, + 'sheets_searched': sheets_searched, + 'sheets_matched': sheets_matched, + 'filter_applied': applied_filters, + 'normalize_match': normalize_match, + 'distinct_count': len(ordered_values), + 'returned_values': min(len(ordered_values), int(max_values)), + 'values': ordered_values[:int(max_values)], + 'values_limited': len(ordered_values) > int(max_values), + }, indent=2, default=str) + + def _evaluate_related_value_membership( + self, + container_name: str, + blob_name: str, + filename: str, + source_sheet_name: str, + source_value_column: str, + target_sheet_name: str, + target_match_column: str, + source_sheet_index: Optional[str] = None, + target_sheet_index: Optional[str] = None, + source_query_expression: Optional[str] = None, + source_filter_column: Optional[str] = None, + source_filter_operator: str = 'equals', + source_filter_value=None, + target_query_expression: Optional[str] = None, + target_filter_column: Optional[str] = None, + target_filter_operator: str = 'equals', + target_filter_value=None, + source_alias_column: Optional[str] = None, + target_alias_column: Optional[str] = None, + normalize_match: bool = True, + max_rows: int = 100, + ) -> dict: + """Evaluate a semi-join between a source cohort and a target fact worksheet.""" + source_sheet, workbook_metadata = self._resolve_sheet_selection( + container_name, + blob_name, + sheet_name=source_sheet_name, + sheet_index=source_sheet_index, + require_explicit_sheet=True, + ) + target_sheet, workbook_metadata = self._resolve_sheet_selection( + container_name, + blob_name, + sheet_name=target_sheet_name, + sheet_index=target_sheet_index, + require_explicit_sheet=True, + ) + + source_df = self._read_tabular_blob_to_dataframe( + container_name, + blob_name, + sheet_name=source_sheet, + require_explicit_sheet=True, + ) + target_df = self._read_tabular_blob_to_dataframe( + container_name, + blob_name, + sheet_name=target_sheet, + require_explicit_sheet=True, + ) + + source_df = self._try_numeric_conversion(source_df) + target_df = self._try_numeric_conversion(target_df) + + source_required_columns = [source_value_column] + if source_alias_column: + source_required_columns.append(source_alias_column) + if source_filter_column: + source_required_columns.append(source_filter_column) + + target_required_columns = [target_match_column] + if target_alias_column: + target_required_columns.append(target_alias_column) + if target_filter_column: + target_required_columns.append(target_filter_column) + + for required_column in source_required_columns: + if required_column not in source_df.columns: + return self._build_missing_column_error_payload( + container_name, + blob_name, + filename, + workbook_metadata, + source_sheet, + required_column, + related_columns=[source_value_column, source_alias_column, source_filter_column], + available_columns=list(source_df.columns), + ) + + for required_column in target_required_columns: + if required_column not in target_df.columns: + return self._build_missing_column_error_payload( + container_name, + blob_name, + filename, + workbook_metadata, + target_sheet, + required_column, + related_columns=[target_match_column, target_alias_column, target_filter_column], + available_columns=list(target_df.columns), + ) + + try: + filtered_source_df, source_filters = self._apply_optional_dataframe_filters( + source_df, + query_expression=source_query_expression, + filter_column=source_filter_column, + filter_operator=source_filter_operator, + filter_value=source_filter_value, + normalize_match=normalize_match, + ) + except KeyError as missing_source_column_error: + missing_source_column = str(missing_source_column_error).strip("'") + return self._build_missing_column_error_payload( + container_name, + blob_name, + filename, + workbook_metadata, + source_sheet, + missing_source_column, + related_columns=[source_value_column, source_alias_column], + available_columns=list(source_df.columns), + ) + except Exception as source_filter_error: + return { + 'error': f"Source filter/query error on sheet '{source_sheet}': {source_filter_error}", + 'filename': filename, + 'selected_sheet': source_sheet if workbook_metadata.get('is_workbook') else None, + } + + try: + filtered_target_df, target_filters = self._apply_optional_dataframe_filters( + target_df, + query_expression=target_query_expression, + filter_column=target_filter_column, + filter_operator=target_filter_operator, + filter_value=target_filter_value, + normalize_match=normalize_match, + ) + except KeyError as missing_target_column_error: + missing_target_column = str(missing_target_column_error).strip("'") + return self._build_missing_column_error_payload( + container_name, + blob_name, + filename, + workbook_metadata, + target_sheet, + missing_target_column, + related_columns=[target_match_column, target_alias_column], + available_columns=list(target_df.columns), + ) + except Exception as target_filter_error: + return { + 'error': f"Target filter/query error on sheet '{target_sheet}': {target_filter_error}", + 'filename': filename, + 'selected_sheet': target_sheet if workbook_metadata.get('is_workbook') else None, + } + + source_member_map = {} + variant_to_source_keys = {} + for _, source_row in filtered_source_df.iterrows(): + display_value = str(source_row.get(source_value_column, '')).strip() + value_variants = set() + for column_name in [source_value_column, source_alias_column]: + if not column_name: + continue + value_variants.update( + self._extract_cell_value_variants( + source_row.get(column_name), + normalize_match=normalize_match, + ) + ) + + if not value_variants: + continue + + primary_variants = self._extract_cell_value_variants( + source_row.get(source_value_column), + normalize_match=normalize_match, + ) + primary_key = sorted(primary_variants or value_variants)[0] + existing_member = source_member_map.setdefault( + primary_key, + { + 'display_value': display_value or primary_key, + 'value_variants': set(), + 'matched_target_row_count': 0, + }, + ) + existing_member['value_variants'].update(value_variants) + for value_variant in value_variants: + variant_to_source_keys.setdefault(value_variant, set()).add(primary_key) + + source_compare_values = set() + for source_member in source_member_map.values(): + source_compare_values.update(source_member['value_variants']) + + matched_target_rows = [] + matched_target_value_variants = set() + for _, target_row in filtered_target_df.iterrows(): + target_value_variants = set() + for column_name in [target_match_column, target_alias_column]: + if not column_name: + continue + target_value_variants.update( + self._extract_cell_value_variants( + target_row.get(column_name), + normalize_match=normalize_match, + ) + ) + + matched_variants = sorted(target_value_variants & source_compare_values) + if not matched_variants: + continue + + matched_source_keys = sorted({ + source_key + for matched_variant in matched_variants + for source_key in variant_to_source_keys.get(matched_variant, set()) + }) + if not matched_source_keys: + continue + + matched_target_value_variants.update(matched_variants) + for source_key in matched_source_keys: + source_member_map[source_key]['matched_target_row_count'] += 1 + + matched_row_payload = target_row.to_dict() + matched_row_payload['_matched_on'] = matched_variants[:3] + matched_row_payload['_matched_source_values'] = [ + source_member_map[source_key]['display_value'] + for source_key in matched_source_keys[:3] + ] + matched_target_rows.append(matched_row_payload) + + matched_source_values = [] + unmatched_source_values = [] + for source_member in source_member_map.values(): + if source_member['matched_target_row_count'] > 0: + matched_source_values.append(source_member['display_value']) + else: + unmatched_source_values.append(source_member['display_value']) + + source_value_match_counts = sorted( + [ + { + 'source_value': source_member['display_value'], + 'matched_target_row_count': source_member['matched_target_row_count'], + } + for source_member in source_member_map.values() + ], + key=lambda item: (-item['matched_target_row_count'], item['source_value'].casefold()) + ) + source_value_match_count_limit = self.SOURCE_VALUE_MATCH_COUNT_LIMIT + + matched_target_row_count = len(matched_target_rows) + return { + 'filename': filename, + 'selected_sheet': target_sheet if workbook_metadata.get('is_workbook') else None, + 'relationship_type': 'set_membership', + 'source_sheet': source_sheet, + 'source_value_column': source_value_column, + 'source_alias_column': source_alias_column, + 'target_sheet': target_sheet, + 'target_match_column': target_match_column, + 'target_alias_column': target_alias_column, + 'normalize_match': normalize_match, + 'source_filter_applied': source_filters, + 'target_filter_applied': target_filters, + 'source_cohort_size': len(source_member_map), + 'matched_source_value_count': len(matched_source_values), + 'matched_source_values_sample': matched_source_values[:10], + 'unmatched_source_value_count': len(unmatched_source_values), + 'unmatched_source_values_sample': unmatched_source_values[:10], + 'source_value_match_counts_returned': min(len(source_value_match_counts), source_value_match_count_limit), + 'source_value_match_counts_limited': len(source_value_match_counts) > source_value_match_count_limit, + 'source_value_match_counts': source_value_match_counts[:source_value_match_count_limit], + 'target_rows_scanned': len(filtered_target_df), + 'matched_target_row_count': matched_target_row_count, + 'returned_rows': min(matched_target_row_count, max_rows), + 'rows_limited': matched_target_row_count > max_rows, + 'data': matched_target_rows[:max_rows], + } + def _format_datetime_column_label(self, value) -> str: """Render date-like Excel header labels into stable analysis-friendly strings.""" timestamp_value = pandas.Timestamp(value) @@ -566,6 +1083,394 @@ def _normalize_dataframe_columns(self, df: pandas.DataFrame) -> pandas.DataFrame normalized_df.columns = normalized_columns return normalized_df + def _normalize_entity_match_text(self, value) -> Optional[str]: + """Normalize entity-style text for stable name and owner comparisons.""" + if value is None or (not isinstance(value, str) and pandas.isna(value)): + return None + + normalized = str(value).casefold().replace('&', ' and ') + normalized = re.sub(r"[`'\u2018\u2019\u201b]+", '', normalized) + normalized = re.sub(r'[^0-9a-z]+', ' ', normalized) + normalized = ' '.join(normalized.split()) + return normalized or None + + def _extract_cell_value_variants(self, value, normalize_match: bool = False) -> Set[str]: + """Return one or more comparable value variants from a cell, including aliases.""" + if value is None or (not isinstance(value, str) and pandas.isna(value)): + return set() + + raw_text = str(value).strip() + if not raw_text: + return set() + + parts = [raw_text] + if ';' in raw_text or '|' in raw_text: + split_parts = re.split(r'[;|]+', raw_text) + parts = [part.strip() for part in split_parts if part.strip()] + + variants = set() + for part in parts: + if normalize_match: + normalized_part = self._normalize_entity_match_text(part) + else: + normalized_part = part.casefold().strip() + if normalized_part: + variants.add(normalized_part) + + return variants + + def _build_series_match_mask( + self, + series: pandas.Series, + operator: str, + value, + normalize_match: bool = False, + ) -> pandas.Series: + """Build a boolean mask for a comparison against a DataFrame column.""" + op = (operator or 'equals').strip().lower() + + numeric_value = None + try: + numeric_value = float(value) + except (ValueError, TypeError): + numeric_value = None + + if op in {'==', 'equals'}: + if numeric_value is not None and pandas.api.types.is_numeric_dtype(series): + return series == numeric_value + if normalize_match: + normalized_value = self._normalize_entity_match_text(value) + normalized_series = series.map(self._normalize_entity_match_text) + return normalized_series == normalized_value + return series.astype(str).str.lower() == str(value).lower() + + if op == '!=': + if numeric_value is not None and pandas.api.types.is_numeric_dtype(series): + return series != numeric_value + if normalize_match: + normalized_value = self._normalize_entity_match_text(value) + normalized_series = series.map(self._normalize_entity_match_text) + return normalized_series != normalized_value + return series.astype(str).str.lower() != str(value).lower() + + if op == '>': + if numeric_value is None: + return pandas.Series([False] * len(series), index=series.index) + return series > numeric_value + + if op == '<': + if numeric_value is None: + return pandas.Series([False] * len(series), index=series.index) + return series < numeric_value + + if op == '>=': + if numeric_value is None: + return pandas.Series([False] * len(series), index=series.index) + return series >= numeric_value + + if op == '<=': + if numeric_value is None: + return pandas.Series([False] * len(series), index=series.index) + return series <= numeric_value + + if normalize_match: + normalized_value = self._normalize_entity_match_text(value) + normalized_series = series.map(self._normalize_entity_match_text).fillna('') + if not normalized_value: + return pandas.Series([False] * len(series), index=series.index) + + if op == 'contains': + return normalized_series.str.contains(normalized_value, regex=False, na=False) + if op == 'startswith': + return normalized_series.str.startswith(normalized_value, na=False) + if op == 'endswith': + return normalized_series.str.endswith(normalized_value, na=False) + else: + text_series = series.astype(str) + value_text = str(value) + if op == 'contains': + return text_series.str.contains(value_text, case=False, na=False) + if op == 'startswith': + return text_series.str.lower().str.startswith(value_text.lower()) + if op == 'endswith': + return text_series.str.lower().str.endswith(value_text.lower()) + + raise ValueError(f"Unsupported operator: {operator}") + + def _apply_optional_dataframe_filters( + self, + df: pandas.DataFrame, + query_expression: Optional[str] = None, + filter_column: Optional[str] = None, + filter_operator: str = 'equals', + filter_value=None, + normalize_match: bool = False, + ) -> tuple: + """Apply optional query and single-column filters to a DataFrame.""" + filtered_df = df + applied_filters = [] + + if query_expression: + filtered_df = filtered_df.query(query_expression) + applied_filters.append(f"query_expression={query_expression}") + + if filter_column: + if filter_column not in filtered_df.columns: + raise KeyError(filter_column) + if filter_value is None: + raise ValueError('filter_value is required when filter_column is provided.') + + mask = self._build_series_match_mask( + filtered_df[filter_column], + filter_operator, + filter_value, + normalize_match=normalize_match, + ) + filtered_df = filtered_df[mask] + applied_filters.append( + f"{filter_column} {filter_operator or 'equals'} {filter_value}" + + (' [normalized]' if normalize_match else '') + ) + + return filtered_df, applied_filters + + def _column_name_tokens(self, column_name: str) -> List[str]: + """Tokenize a column label for relationship heuristics.""" + normalized = re.sub(r'[^0-9a-z]+', ' ', str(column_name or '').casefold()) + return [token for token in normalized.split() if token] + + def _build_relationship_column_profile(self, column_name: str, series: pandas.Series) -> dict: + """Build a compact column profile used for workbook relationship inference.""" + tokens = self._column_name_tokens(column_name) + non_null_series = series.dropna() + name_hint_score = sum(1 for token in tokens if token in self.RELATIONSHIP_COLUMN_HINT_TOKENS) + is_identifier_like = any(token == 'id' or token.endswith('id') for token in tokens) + is_entity_like = name_hint_score > 0 or is_identifier_like + distinct_count = int(non_null_series.astype(str).nunique(dropna=True)) if not non_null_series.empty else 0 + + normalized_values = [] + seen_values = set() + for raw_value in non_null_series.astype(str): + normalized_value = self._normalize_entity_match_text(raw_value) + if not normalized_value or normalized_value in seen_values: + continue + seen_values.add(normalized_value) + normalized_values.append(normalized_value) + if len(normalized_values) >= self.RELATIONSHIP_VALUE_SAMPLE_LIMIT: + break + + return { + 'column_name': column_name, + 'normalized_column_name': ' '.join(tokens), + 'tokens': tokens, + 'name_hint_score': name_hint_score, + 'is_identifier_like': is_identifier_like, + 'is_entity_like': is_entity_like, + 'distinct_count': distinct_count, + 'normalized_value_set': set(normalized_values), + 'sample_distinct_count': len(normalized_values), + 'is_numeric': pandas.api.types.is_numeric_dtype(series), + } + + def _infer_sheet_role_hint(self, sheet_name: str, row_count: int, column_profiles: List[dict], max_row_count: int) -> dict: + """Infer whether a worksheet looks like a fact, dimension, or metadata table.""" + normalized_sheet_name = str(sheet_name or '').casefold() + entity_profiles = [profile for profile in column_profiles if profile['is_entity_like']] + natural_key_profiles = [ + profile for profile in entity_profiles + if profile['distinct_count'] >= max(2, row_count - 1) + ] + repeated_entity_profiles = [ + profile for profile in entity_profiles + if 0 < profile['distinct_count'] < row_count + ] + join_columns = [ + profile['column_name'] + for profile in sorted( + column_profiles, + key=lambda item: (-item['name_hint_score'], item['distinct_count'], item['column_name'].casefold()) + ) + if profile['is_entity_like'] + ][:5] + + if any(token in normalized_sheet_name for token in ('meta', 'config', 'summary', 'readme', 'about')): + return { + 'role': 'metadata', + 'reason': 'sheet name suggests metadata or workbook configuration', + 'row_count': row_count, + 'likely_join_columns': join_columns, + } + + if row_count >= max(25, max_row_count // 2 or 1) and repeated_entity_profiles: + return { + 'role': 'fact', + 'reason': 'larger table contains repeated entity values consistent with transactional rows', + 'row_count': row_count, + 'likely_join_columns': join_columns, + } + + if natural_key_profiles and row_count <= max(200, max_row_count // 4 or 1): + return { + 'role': 'dimension', + 'reason': 'smaller table contains near-unique entity keys suitable for cohort or lookup joins', + 'row_count': row_count, + 'likely_join_columns': join_columns, + } + + if row_count <= 5 and not entity_profiles and len(column_profiles) <= 6: + return { + 'role': 'metadata', + 'reason': 'very small table with limited columns', + 'row_count': row_count, + 'likely_join_columns': join_columns, + } + + if join_columns and row_count <= max(200, max_row_count // 4 or 1): + return { + 'role': 'dimension', + 'reason': 'smaller table with entity-like or identifier-like columns', + 'row_count': row_count, + 'likely_join_columns': join_columns, + } + + if row_count >= max(25, max_row_count // 2 or 1): + return { + 'role': 'fact', + 'reason': 'larger table likely containing repeated transactional or milestone-style rows', + 'row_count': row_count, + 'likely_join_columns': join_columns, + } + + return { + 'role': 'unknown', + 'reason': 'insufficient evidence to confidently classify worksheet role', + 'row_count': row_count, + 'likely_join_columns': join_columns, + } + + def _build_workbook_relationship_metadata(self, sheet_dataframes: Dict[str, pandas.DataFrame]) -> dict: + """Infer likely worksheet roles and relationship hints from workbook data.""" + if len(sheet_dataframes) <= 1: + return { + 'sheet_role_hints': {}, + 'relationship_hints': [], + } + + max_row_count = max((len(dataframe) for dataframe in sheet_dataframes.values()), default=0) + column_profiles_by_sheet = {} + sheet_role_hints = {} + + for sheet_name, dataframe in sheet_dataframes.items(): + column_profiles = [ + self._build_relationship_column_profile(column_name, dataframe[column_name]) + for column_name in dataframe.columns + ] + candidate_profiles = [ + profile for profile in column_profiles + if profile['is_entity_like'] + or profile['distinct_count'] <= max(50, len(dataframe)) + ] + column_profiles_by_sheet[sheet_name] = candidate_profiles[:8] + sheet_role_hints[sheet_name] = self._infer_sheet_role_hint( + sheet_name, + len(dataframe), + candidate_profiles, + max_row_count, + ) + + relationship_hints = [] + sheet_names = list(sheet_dataframes.keys()) + for left_index, left_sheet in enumerate(sheet_names): + for right_sheet in sheet_names[left_index + 1:]: + left_profiles = column_profiles_by_sheet.get(left_sheet, []) + right_profiles = column_profiles_by_sheet.get(right_sheet, []) + left_role_hint = sheet_role_hints.get(left_sheet, {}) + right_role_hint = sheet_role_hints.get(right_sheet, {}) + + for left_profile in left_profiles: + for right_profile in right_profiles: + overlap_values = sorted( + left_profile['normalized_value_set'] & right_profile['normalized_value_set'] + ) + overlap_count = len(overlap_values) + token_overlap_count = len( + set(left_profile['tokens']) & set(right_profile['tokens']) + ) + exact_column_name_match = ( + left_profile['normalized_column_name'] + and left_profile['normalized_column_name'] == right_profile['normalized_column_name'] + ) + + if overlap_count == 0: + continue + if overlap_count < 2 and not exact_column_name_match and token_overlap_count == 0: + continue + + left_distinct = max(1, left_profile['sample_distinct_count']) + right_distinct = max(1, right_profile['sample_distinct_count']) + overlap_ratio_vs_smaller = round(overlap_count / min(left_distinct, right_distinct), 3) + + if left_role_hint.get('role') == 'dimension' and right_role_hint.get('role') == 'fact': + reference_sheet = left_sheet + reference_profile = left_profile + reference_role = left_role_hint.get('role') + fact_sheet = right_sheet + fact_profile = right_profile + fact_role = right_role_hint.get('role') + elif right_role_hint.get('role') == 'dimension' and left_role_hint.get('role') == 'fact': + reference_sheet = right_sheet + reference_profile = right_profile + reference_role = right_role_hint.get('role') + fact_sheet = left_sheet + fact_profile = left_profile + fact_role = left_role_hint.get('role') + elif len(sheet_dataframes[left_sheet]) <= len(sheet_dataframes[right_sheet]): + reference_sheet = left_sheet + reference_profile = left_profile + reference_role = left_role_hint.get('role') + fact_sheet = right_sheet + fact_profile = right_profile + fact_role = right_role_hint.get('role') + else: + reference_sheet = right_sheet + reference_profile = right_profile + reference_role = right_role_hint.get('role') + fact_sheet = left_sheet + fact_profile = left_profile + fact_role = left_role_hint.get('role') + + relationship_hints.append({ + 'reference_sheet': reference_sheet, + 'reference_column': reference_profile['column_name'], + 'reference_role': reference_role, + 'fact_sheet': fact_sheet, + 'fact_column': fact_profile['column_name'], + 'fact_role': fact_role, + 'normalized_overlap_count': overlap_count, + 'reference_distinct_count': reference_profile['sample_distinct_count'], + 'fact_distinct_count': fact_profile['sample_distinct_count'], + 'overlap_ratio_vs_smaller_set': overlap_ratio_vs_smaller, + 'exact_column_name_match': bool(exact_column_name_match), + 'shared_name_token_count': token_overlap_count, + 'sample_overlap_values': overlap_values[:self.RELATIONSHIP_SHARED_VALUE_LIMIT], + }) + + relationship_hints.sort( + key=lambda item: ( + -item['normalized_overlap_count'], + -item['overlap_ratio_vs_smaller_set'], + -int(item['exact_column_name_match']), + -item['shared_name_token_count'], + item['reference_sheet'].casefold(), + item['fact_sheet'].casefold(), + ) + ) + + return { + 'sheet_role_hints': sheet_role_hints, + 'relationship_hints': relationship_hints[:self.RELATIONSHIP_HINT_LIMIT], + } + def _build_sheet_schema_summary(self, df: pandas.DataFrame, sheet_name: Optional[str], preview_rows: int = 3) -> dict: """Build a compact schema summary for a single table or worksheet.""" df = self._normalize_dataframe_columns(df) @@ -595,18 +1500,22 @@ def _build_workbook_schema_summary(self, container_name: str, blob_name: str, fi return summary per_sheet_schemas = {} + sheet_dataframes = {} for workbook_sheet_name in workbook_metadata.get('sheet_names', []): df = self._read_tabular_blob_to_dataframe( container_name, blob_name, sheet_name=workbook_sheet_name, ) + sheet_dataframes[workbook_sheet_name] = df.copy() per_sheet_schemas[workbook_sheet_name] = self._build_sheet_schema_summary( df, workbook_sheet_name, preview_rows=preview_rows, ) + relationship_metadata = self._build_workbook_relationship_metadata(sheet_dataframes) + return { 'filename': filename, 'is_workbook': True, @@ -614,6 +1523,8 @@ def _build_workbook_schema_summary(self, container_name: str, blob_name: str, fi 'sheet_count': workbook_metadata.get('sheet_count', 0), 'selected_sheet': None, 'per_sheet_schemas': per_sheet_schemas, + 'sheet_role_hints': relationship_metadata.get('sheet_role_hints', {}), + 'relationship_hints': relationship_metadata.get('relationship_hints', []), } def _find_candidate_sheets_for_columns( @@ -1233,6 +2144,7 @@ async def lookup_value( lookup_value: Annotated[str, "The row label/category value to search for, such as Total Assets"], target_column: Annotated[str, "The target column containing the desired value, such as Nov-25"], match_operator: Annotated[str, "Match operator: equals, contains, startswith, endswith"] = "equals", + normalize_match: Annotated[str, "Whether to normalize string/entity matching for text comparisons (true/false)"] = "false", sheet_name: Annotated[Optional[str], "Optional worksheet name for Excel files. Required for analytical calls on multi-sheet workbooks unless sheet_index is provided."] = None, sheet_index: Annotated[Optional[str], "Optional zero-based worksheet index for Excel files. Ignored when sheet_name is provided."] = None, source: Annotated[str, "Source: 'workspace', 'chat', 'group', or 'public'"] = "chat", @@ -1243,6 +2155,7 @@ async def lookup_value( """Look up values from a target column for matching rows.""" def _sync_work(): try: + normalize_match_flag = self._parse_boolean_argument(normalize_match, default=False) container, blob_path = self._resolve_blob_location_with_fallback( user_id, conversation_id, filename, source, group_id=group_id, public_workspace_id=public_workspace_id @@ -1255,6 +2168,7 @@ def _sync_work(): container, blob_path, filename, lookup_column, lookup_value, target_column, match_operator=match_operator, + normalize_match=normalize_match_flag, max_rows=int(max_rows), ) if cross_sheet_result is not None: @@ -1301,19 +2215,17 @@ def _sync_work(): ) ) - series = df[lookup_column] operator = (match_operator or 'equals').strip().lower() normalized_lookup_value = str(lookup_value) - if operator in {'equals', '=='}: - mask = series.astype(str).str.lower() == normalized_lookup_value.lower() - elif operator == 'contains': - mask = series.astype(str).str.contains(normalized_lookup_value, case=False, na=False) - elif operator == 'startswith': - mask = series.astype(str).str.lower().str.startswith(normalized_lookup_value.lower()) - elif operator == 'endswith': - mask = series.astype(str).str.lower().str.endswith(normalized_lookup_value.lower()) - else: + try: + mask = self._build_series_match_mask( + df[lookup_column], + operator, + normalized_lookup_value, + normalize_match=normalize_match_flag, + ) + except ValueError: return json.dumps({"error": f"Unsupported match_operator: {match_operator}"}) limit = int(max_rows) @@ -1325,6 +2237,7 @@ def _sync_work(): "lookup_value": lookup_value, "target_column": target_column, "match_operator": operator, + "normalize_match": normalize_match_flag, "total_matches": int(mask.sum()), "returned_rows": len(matches), "data": matches.to_dict(orient='records'), @@ -1339,6 +2252,257 @@ def _sync_work(): return json.dumps({"error": str(e)}) return await asyncio.to_thread(_sync_work) + @kernel_function( + description=( + "Return deterministic distinct values for a column, with optional query_expression or filter criteria. " + "Use this to build a canonical cohort from a worksheet before counting or joining related rows." + ), + name="get_distinct_values" + ) + @plugin_function_logger("TabularProcessingPlugin") + async def get_distinct_values( + self, + user_id: Annotated[str, "The user ID (from Scope ID in Conversation Metadata)"], + conversation_id: Annotated[str, "The conversation ID (from Conversation Metadata)"], + filename: Annotated[str, "The filename of the tabular file"], + column: Annotated[str, "The column from which to return distinct values"], + query_expression: Annotated[Optional[str], "Optional pandas DataFrame.query() expression to apply before collecting distinct values"] = None, + filter_column: Annotated[Optional[str], "Optional column to filter on before collecting distinct values"] = None, + filter_operator: Annotated[str, "Optional filter operator when filter_column is provided"] = "equals", + filter_value: Annotated[Optional[str], "Optional filter value when filter_column is provided"] = None, + normalize_match: Annotated[str, "Whether to normalize string/entity matching and deduplication (true/false)"] = "true", + sheet_name: Annotated[Optional[str], "Optional worksheet name for Excel files. When omitted, the plugin may perform a cross-sheet distinct-value search."] = None, + sheet_index: Annotated[Optional[str], "Optional zero-based worksheet index for Excel files. Ignored when sheet_name is provided."] = None, + source: Annotated[str, "Source: 'workspace', 'chat', 'group', or 'public'"] = "chat", + max_values: Annotated[str, "Maximum distinct values to return"] = "100", + group_id: Annotated[Optional[str], "Group ID (for group workspace documents)"] = None, + public_workspace_id: Annotated[Optional[str], "Public workspace ID (for public workspace documents)"] = None, + ) -> Annotated[str, "JSON result containing deterministic distinct values and counts"]: + """Return deterministic distinct values from a worksheet or across worksheets.""" + def _sync_work(): + try: + normalize_match_flag = self._parse_boolean_argument(normalize_match, default=True) + container, blob_path = self._resolve_blob_location_with_fallback( + user_id, conversation_id, filename, source, + group_id=group_id, public_workspace_id=public_workspace_id + ) + + normalized_sheet = (sheet_name or '').strip() + normalized_sheet_idx = None if sheet_index is None else str(sheet_index).strip() + if not normalized_sheet and normalized_sheet_idx in (None, ''): + cross_sheet_result = self._get_distinct_values_across_sheets( + container, + blob_path, + filename, + column, + query_expression=query_expression, + filter_column=filter_column, + filter_operator=filter_operator, + filter_value=filter_value, + normalize_match=normalize_match_flag, + max_values=int(max_values), + ) + if cross_sheet_result is not None: + return cross_sheet_result + + selected_sheet, workbook_metadata = self._resolve_sheet_selection( + container, + blob_path, + sheet_name=sheet_name, + sheet_index=sheet_index, + require_explicit_sheet=True, + ) + df = self._read_tabular_blob_to_dataframe( + container, + blob_path, + sheet_name=selected_sheet, + require_explicit_sheet=True, + ) + + if column not in df.columns: + return json.dumps( + self._build_missing_column_error_payload( + container, + blob_path, + filename, + workbook_metadata, + selected_sheet, + column, + related_columns=[filter_column] if filter_column else None, + available_columns=list(df.columns), + ) + ) + + try: + filtered_df, applied_filters = self._apply_optional_dataframe_filters( + df, + query_expression=query_expression, + filter_column=filter_column, + filter_operator=filter_operator, + filter_value=filter_value, + normalize_match=normalize_match_flag, + ) + except KeyError as missing_column_error: + missing_column = str(missing_column_error).strip("'") + return json.dumps( + self._build_missing_column_error_payload( + container, + blob_path, + filename, + workbook_metadata, + selected_sheet, + missing_column, + related_columns=[column], + available_columns=list(df.columns), + ) + ) + except Exception as query_error: + return json.dumps({ + 'error': f"Query/filter error: {query_error}", + 'filename': filename, + 'selected_sheet': selected_sheet if workbook_metadata.get('is_workbook') else None, + }) + + distinct_display_values = {} + for cell_value in filtered_df[column].tolist(): + display_value = str(cell_value).strip() + if not display_value: + continue + compare_variants = self._extract_cell_value_variants( + cell_value, + normalize_match=normalize_match_flag, + ) + if not compare_variants: + continue + canonical_key = sorted(compare_variants)[0] + distinct_display_values.setdefault(canonical_key, display_value) + + ordered_values = sorted(distinct_display_values.values(), key=lambda item: item.casefold()) + limit = int(max_values) + return json.dumps({ + 'filename': filename, + 'selected_sheet': selected_sheet if workbook_metadata.get('is_workbook') else None, + 'column': column, + 'filter_applied': applied_filters, + 'normalize_match': normalize_match_flag, + 'distinct_count': len(ordered_values), + 'returned_values': min(len(ordered_values), limit), + 'values': ordered_values[:limit], + 'values_limited': len(ordered_values) > limit, + }, indent=2, default=str) + except Exception as e: + log_event(f"[TabularProcessingPlugin] Error getting distinct values: {e}", level=logging.WARNING) + return json.dumps({"error": str(e)}) + + return await asyncio.to_thread(_sync_work) + + @kernel_function( + description=( + "Return a deterministic row count after applying an optional query_expression or filter condition. " + "Use this instead of estimating counts from partial returned rows when the user asks how many or what percentage." + ), + name="count_rows" + ) + @plugin_function_logger("TabularProcessingPlugin") + async def count_rows( + self, + user_id: Annotated[str, "The user ID (from Scope ID in Conversation Metadata)"], + conversation_id: Annotated[str, "The conversation ID (from Conversation Metadata)"], + filename: Annotated[str, "The filename of the tabular file"], + query_expression: Annotated[Optional[str], "Optional pandas DataFrame.query() expression to apply before counting rows"] = None, + filter_column: Annotated[Optional[str], "Optional column to filter on before counting rows"] = None, + filter_operator: Annotated[str, "Optional filter operator when filter_column is provided"] = "equals", + filter_value: Annotated[Optional[str], "Optional filter value when filter_column is provided"] = None, + normalize_match: Annotated[str, "Whether to normalize string/entity matching for text comparisons (true/false)"] = "false", + sheet_name: Annotated[Optional[str], "Optional worksheet name for Excel files. When omitted, the plugin may perform a cross-sheet row count."] = None, + sheet_index: Annotated[Optional[str], "Optional zero-based worksheet index for Excel files. Ignored when sheet_name is provided."] = None, + source: Annotated[str, "Source: 'workspace', 'chat', 'group', or 'public'"] = "chat", + group_id: Annotated[Optional[str], "Group ID (for group workspace documents)"] = None, + public_workspace_id: Annotated[Optional[str], "Public workspace ID (for public workspace documents)"] = None, + ) -> Annotated[str, "JSON result containing a deterministic row count"]: + """Count rows deterministically after optional filters or queries.""" + def _sync_work(): + try: + normalize_match_flag = self._parse_boolean_argument(normalize_match, default=False) + container, blob_path = self._resolve_blob_location_with_fallback( + user_id, conversation_id, filename, source, + group_id=group_id, public_workspace_id=public_workspace_id + ) + + normalized_sheet = (sheet_name or '').strip() + normalized_sheet_idx = None if sheet_index is None else str(sheet_index).strip() + if not normalized_sheet and normalized_sheet_idx in (None, ''): + cross_sheet_result = self._count_rows_across_sheets( + container, + blob_path, + filename, + filter_column=filter_column, + filter_operator=filter_operator, + filter_value=filter_value, + query_expression=query_expression, + normalize_match=normalize_match_flag, + ) + if cross_sheet_result is not None: + return cross_sheet_result + + selected_sheet, workbook_metadata = self._resolve_sheet_selection( + container, + blob_path, + sheet_name=sheet_name, + sheet_index=sheet_index, + require_explicit_sheet=True, + ) + df = self._read_tabular_blob_to_dataframe( + container, + blob_path, + sheet_name=selected_sheet, + require_explicit_sheet=True, + ) + df = self._try_numeric_conversion(df) + + try: + filtered_df, applied_filters = self._apply_optional_dataframe_filters( + df, + query_expression=query_expression, + filter_column=filter_column, + filter_operator=filter_operator, + filter_value=filter_value, + normalize_match=normalize_match_flag, + ) + except KeyError as missing_column_error: + missing_column = str(missing_column_error).strip("'") + return json.dumps( + self._build_missing_column_error_payload( + container, + blob_path, + filename, + workbook_metadata, + selected_sheet, + missing_column, + available_columns=list(df.columns), + ) + ) + except Exception as query_error: + return json.dumps({ + 'error': f"Query/filter error: {query_error}", + 'filename': filename, + 'selected_sheet': selected_sheet if workbook_metadata.get('is_workbook') else None, + }) + + return json.dumps({ + 'filename': filename, + 'selected_sheet': selected_sheet if workbook_metadata.get('is_workbook') else None, + 'rows_scanned': len(df), + 'row_count': len(filtered_df), + 'filter_applied': applied_filters, + 'normalize_match': normalize_match_flag, + }, indent=2, default=str) + except Exception as e: + log_event(f"[TabularProcessingPlugin] Error counting rows: {e}", level=logging.WARNING) + return json.dumps({"error": str(e)}) + + return await asyncio.to_thread(_sync_work) + @kernel_function( description=( "Execute an aggregation operation on a column of a tabular file. " @@ -1447,6 +2611,7 @@ async def filter_rows( column: Annotated[str, "The column to filter on"], operator: Annotated[str, "Operator: ==, !=, >, <, >=, <=, contains, startswith, endswith"], value: Annotated[str, "The value to compare against"], + normalize_match: Annotated[str, "Whether to normalize string/entity matching for text comparisons (true/false)"] = "false", sheet_name: Annotated[Optional[str], "Optional worksheet name for Excel files. Required for analytical calls on multi-sheet workbooks unless sheet_index is provided."] = None, sheet_index: Annotated[Optional[str], "Optional zero-based worksheet index for Excel files. Ignored when sheet_name is provided."] = None, source: Annotated[str, "Source: 'workspace', 'chat', 'group', or 'public'"] = "chat", @@ -1457,6 +2622,7 @@ async def filter_rows( """Filter rows based on a condition.""" def _sync_work(): try: + normalize_match_flag = self._parse_boolean_argument(normalize_match, default=False) container, blob_path = self._resolve_blob_location_with_fallback( user_id, conversation_id, filename, source, group_id=group_id, public_workspace_id=public_workspace_id @@ -1467,6 +2633,7 @@ def _sync_work(): if not normalized_sheet and normalized_sheet_idx in (None, ''): cross_sheet_result = self._filter_rows_across_sheets( container, blob_path, filename, column, operator, value, + normalize_match=normalize_match_flag, max_rows=int(max_rows), ) if cross_sheet_result is not None: @@ -1487,42 +2654,26 @@ def _sync_work(): df = self._try_numeric_conversion(df) if column not in df.columns: - return json.dumps({"error": f"Column '{column}' not found. Available: {list(df.columns)}"}) - - series = df[column] - op = operator.strip().lower() + return json.dumps( + self._build_missing_column_error_payload( + container, + blob_path, + filename, + workbook_metadata, + selected_sheet, + column, + available_columns=list(df.columns), + ) + ) - numeric_value = None try: - numeric_value = float(value) - except (ValueError, TypeError): - pass - - if op == '==' or op == 'equals': - if numeric_value is not None and pandas.api.types.is_numeric_dtype(series): - mask = series == numeric_value - else: - mask = series.astype(str).str.lower() == value.lower() - elif op == '!=': - if numeric_value is not None and pandas.api.types.is_numeric_dtype(series): - mask = series != numeric_value - else: - mask = series.astype(str).str.lower() != value.lower() - elif op == '>': - mask = series > numeric_value - elif op == '<': - mask = series < numeric_value - elif op == '>=': - mask = series >= numeric_value - elif op == '<=': - mask = series <= numeric_value - elif op == 'contains': - mask = series.astype(str).str.contains(value, case=False, na=False) - elif op == 'startswith': - mask = series.astype(str).str.lower().str.startswith(value.lower()) - elif op == 'endswith': - mask = series.astype(str).str.lower().str.endswith(value.lower()) - else: + mask = self._build_series_match_mask( + df[column], + operator, + value, + normalize_match=normalize_match_flag, + ) + except ValueError: return json.dumps({"error": f"Unsupported operator: {operator}"}) limit = int(max_rows) @@ -1530,6 +2681,7 @@ def _sync_work(): return json.dumps({ "filename": filename, "selected_sheet": selected_sheet if workbook_metadata.get('is_workbook') else None, + "normalize_match": normalize_match_flag, "total_matches": int(mask.sum()), "returned_rows": len(filtered), "data": filtered.to_dict(orient='records') @@ -1607,6 +2759,156 @@ def _sync_work(): return json.dumps({"error": f"Query error: {str(e)}. Ensure column names and values are correct."}) return await asyncio.to_thread(_sync_work) + @kernel_function( + description=( + "Filter rows in one worksheet where the target match column belongs to a cohort defined by values from another worksheet. " + "Use this for relational workbook questions such as facts owned by a cohort or records tied to a reference sheet membership list." + ), + name="filter_rows_by_related_values" + ) + @plugin_function_logger("TabularProcessingPlugin") + async def filter_rows_by_related_values( + self, + user_id: Annotated[str, "The user ID (from Scope ID in Conversation Metadata)"], + conversation_id: Annotated[str, "The conversation ID (from Conversation Metadata)"], + filename: Annotated[str, "The filename of the tabular file"], + source_sheet_name: Annotated[str, "Worksheet that defines the cohort or reference values"], + source_value_column: Annotated[str, "Column on the source worksheet that contains the canonical cohort values"], + target_sheet_name: Annotated[str, "Worksheet containing the fact rows to filter"], + target_match_column: Annotated[str, "Column on the target worksheet that should match the source cohort values"], + source_query_expression: Annotated[Optional[str], "Optional pandas DataFrame.query() expression to narrow the source cohort"] = None, + source_filter_column: Annotated[Optional[str], "Optional source-sheet filter column"] = None, + source_filter_operator: Annotated[str, "Optional source-sheet filter operator"] = "equals", + source_filter_value: Annotated[Optional[str], "Optional source-sheet filter value"] = None, + target_query_expression: Annotated[Optional[str], "Optional pandas DataFrame.query() expression to narrow target rows before matching"] = None, + target_filter_column: Annotated[Optional[str], "Optional target-sheet filter column"] = None, + target_filter_operator: Annotated[str, "Optional target-sheet filter operator"] = "equals", + target_filter_value: Annotated[Optional[str], "Optional target-sheet filter value"] = None, + source_alias_column: Annotated[Optional[str], "Optional alternate or alias source column used for normalized matching"] = None, + target_alias_column: Annotated[Optional[str], "Optional alternate or alias target column used for normalized matching"] = None, + normalize_match: Annotated[str, "Whether to normalize entity-style text matching across worksheets (true/false)"] = "true", + source_sheet_index: Annotated[Optional[str], "Optional zero-based source worksheet index if sheet name is not used"] = None, + target_sheet_index: Annotated[Optional[str], "Optional zero-based target worksheet index if sheet name is not used"] = None, + source: Annotated[str, "Source: 'workspace', 'chat', 'group', or 'public'"] = "chat", + max_rows: Annotated[str, "Maximum related target rows to return"] = "100", + group_id: Annotated[Optional[str], "Group ID (for group workspace documents)"] = None, + public_workspace_id: Annotated[Optional[str], "Public workspace ID (for public workspace documents)"] = None, + ) -> Annotated[str, "JSON result containing explainable set-membership filtering output"]: + """Filter target rows by membership in a source-sheet cohort.""" + def _sync_work(): + try: + normalize_match_flag = self._parse_boolean_argument(normalize_match, default=True) + container, blob_path = self._resolve_blob_location_with_fallback( + user_id, conversation_id, filename, source, + group_id=group_id, public_workspace_id=public_workspace_id + ) + result_payload = self._evaluate_related_value_membership( + container, + blob_path, + filename, + source_sheet_name=source_sheet_name, + source_value_column=source_value_column, + target_sheet_name=target_sheet_name, + target_match_column=target_match_column, + source_sheet_index=source_sheet_index, + target_sheet_index=target_sheet_index, + source_query_expression=source_query_expression, + source_filter_column=source_filter_column, + source_filter_operator=source_filter_operator, + source_filter_value=source_filter_value, + target_query_expression=target_query_expression, + target_filter_column=target_filter_column, + target_filter_operator=target_filter_operator, + target_filter_value=target_filter_value, + source_alias_column=source_alias_column, + target_alias_column=target_alias_column, + normalize_match=normalize_match_flag, + max_rows=int(max_rows), + ) + return json.dumps(result_payload, indent=2, default=str) + except Exception as e: + log_event(f"[TabularProcessingPlugin] Error filtering rows by related values: {e}", level=logging.WARNING) + return json.dumps({"error": str(e)}) + + return await asyncio.to_thread(_sync_work) + + @kernel_function( + description=( + "Count rows in one worksheet where the target match column belongs to a cohort defined by another worksheet. " + "Use this for deterministic numerator/denominator calculations and percentages across related sheets." + ), + name="count_rows_by_related_values" + ) + @plugin_function_logger("TabularProcessingPlugin") + async def count_rows_by_related_values( + self, + user_id: Annotated[str, "The user ID (from Scope ID in Conversation Metadata)"], + conversation_id: Annotated[str, "The conversation ID (from Conversation Metadata)"], + filename: Annotated[str, "The filename of the tabular file"], + source_sheet_name: Annotated[str, "Worksheet that defines the cohort or reference values"], + source_value_column: Annotated[str, "Column on the source worksheet that contains the canonical cohort values"], + target_sheet_name: Annotated[str, "Worksheet containing the fact rows to count"], + target_match_column: Annotated[str, "Column on the target worksheet that should match the source cohort values"], + source_query_expression: Annotated[Optional[str], "Optional pandas DataFrame.query() expression to narrow the source cohort"] = None, + source_filter_column: Annotated[Optional[str], "Optional source-sheet filter column"] = None, + source_filter_operator: Annotated[str, "Optional source-sheet filter operator"] = "equals", + source_filter_value: Annotated[Optional[str], "Optional source-sheet filter value"] = None, + target_query_expression: Annotated[Optional[str], "Optional pandas DataFrame.query() expression to narrow target rows before matching"] = None, + target_filter_column: Annotated[Optional[str], "Optional target-sheet filter column"] = None, + target_filter_operator: Annotated[str, "Optional target-sheet filter operator"] = "equals", + target_filter_value: Annotated[Optional[str], "Optional target-sheet filter value"] = None, + source_alias_column: Annotated[Optional[str], "Optional alternate or alias source column used for normalized matching"] = None, + target_alias_column: Annotated[Optional[str], "Optional alternate or alias target column used for normalized matching"] = None, + normalize_match: Annotated[str, "Whether to normalize entity-style text matching across worksheets (true/false)"] = "true", + source_sheet_index: Annotated[Optional[str], "Optional zero-based source worksheet index if sheet name is not used"] = None, + target_sheet_index: Annotated[Optional[str], "Optional zero-based target worksheet index if sheet name is not used"] = None, + source: Annotated[str, "Source: 'workspace', 'chat', 'group', or 'public'"] = "chat", + group_id: Annotated[Optional[str], "Group ID (for group workspace documents)"] = None, + public_workspace_id: Annotated[Optional[str], "Public workspace ID (for public workspace documents)"] = None, + ) -> Annotated[str, "JSON result containing an explainable relational row count"]: + """Count target rows by membership in a source-sheet cohort.""" + def _sync_work(): + try: + normalize_match_flag = self._parse_boolean_argument(normalize_match, default=True) + container, blob_path = self._resolve_blob_location_with_fallback( + user_id, conversation_id, filename, source, + group_id=group_id, public_workspace_id=public_workspace_id + ) + result_payload = self._evaluate_related_value_membership( + container, + blob_path, + filename, + source_sheet_name=source_sheet_name, + source_value_column=source_value_column, + target_sheet_name=target_sheet_name, + target_match_column=target_match_column, + source_sheet_index=source_sheet_index, + target_sheet_index=target_sheet_index, + source_query_expression=source_query_expression, + source_filter_column=source_filter_column, + source_filter_operator=source_filter_operator, + source_filter_value=source_filter_value, + target_query_expression=target_query_expression, + target_filter_column=target_filter_column, + target_filter_operator=target_filter_operator, + target_filter_value=target_filter_value, + source_alias_column=source_alias_column, + target_alias_column=target_alias_column, + normalize_match=normalize_match_flag, + max_rows=50, + ) + if 'error' not in result_payload: + result_payload['row_count'] = result_payload.get('matched_target_row_count', 0) + result_payload.pop('data', None) + result_payload.pop('returned_rows', None) + result_payload.pop('rows_limited', None) + return json.dumps(result_payload, indent=2, default=str) + except Exception as e: + log_event(f"[TabularProcessingPlugin] Error counting rows by related values: {e}", level=logging.WARNING) + return json.dumps({"error": str(e)}) + + return await asyncio.to_thread(_sync_work) + @kernel_function( description=( "Perform a group-by aggregation on a tabular file. " diff --git a/application/single_app/static/js/admin/admin_plugins.js b/application/single_app/static/js/admin/admin_plugins.js index ad497f62..97dc39ec 100644 --- a/application/single_app/static/js/admin/admin_plugins.js +++ b/application/single_app/static/js/admin/admin_plugins.js @@ -1,6 +1,6 @@ // admin_plugins.js (updated to use new multi-step modal) import { showToast } from "../chat/chat-toast.js" -import { renderPluginsTable as sharedRenderPluginsTable, validatePluginManifest as sharedValidatePluginManifest } from "../plugin_common.js"; +import { renderPluginsTable as sharedRenderPluginsTable, validatePluginManifest as sharedValidatePluginManifest, getErrorMessageFromResponse } from "../plugin_common.js"; // Main logic document.addEventListener('DOMContentLoaded', function () { @@ -65,9 +65,11 @@ function setupSaveHandler(plugin, modal) { const formData = window.pluginModalStepper.getFormData(); // Validate with JSON schema - const valid = await sharedValidatePluginManifest(formData); - if (!valid) { - window.pluginModalStepper.showError('Validation error: Invalid action data.'); + const validation = await sharedValidatePluginManifest(formData); + const validationFailed = validation === false || (validation && validation.valid === false); + if (validationFailed) { + const message = validation?.errors?.join('\n') || 'Validation error: Invalid action data.'; + window.pluginModalStepper.showError(message); return; } @@ -118,8 +120,8 @@ async function savePlugin(pluginData, existingPlugin = null) { }); if (!saveRes.ok) { - const errorText = await saveRes.text(); - throw new Error(`Failed to save action: ${errorText}`); + const errorMessage = await getErrorMessageFromResponse(saveRes, 'Failed to save action'); + throw new Error(errorMessage); } } diff --git a/application/single_app/static/js/admin/admin_plugins_new.js b/application/single_app/static/js/admin/admin_plugins_new.js index 93c7a926..9d378d43 100644 --- a/application/single_app/static/js/admin/admin_plugins_new.js +++ b/application/single_app/static/js/admin/admin_plugins_new.js @@ -1,6 +1,6 @@ // admin_plugins.js (updated to use new multi-step modal) import { showToast } from "../chat/chat-toast.js" -import { renderPluginsTable as sharedRenderPluginsTable, validatePluginManifest as sharedValidatePluginManifest } from "../plugin_common.js"; +import { renderPluginsTable as sharedRenderPluginsTable, validatePluginManifest as sharedValidatePluginManifest, getErrorMessageFromResponse } from "../plugin_common.js"; // Main logic document.addEventListener('DOMContentLoaded', function () { @@ -61,9 +61,11 @@ function setupSaveHandler(plugin, modal) { const formData = window.pluginModalStepper.getFormData(); // Validate with JSON schema - const valid = await sharedValidatePluginManifest(formData); - if (!valid) { - window.pluginModalStepper.showError('Validation error: Invalid action data.'); + const validation = await sharedValidatePluginManifest(formData); + const validationFailed = validation === false || (validation && validation.valid === false); + if (validationFailed) { + const message = validation?.errors?.join('\n') || 'Validation error: Invalid action data.'; + window.pluginModalStepper.showError(message); return; } @@ -103,8 +105,8 @@ async function savePlugin(pluginData, existingPlugin = null) { }); if (!saveRes.ok) { - const errorText = await saveRes.text(); - throw new Error(`Failed to save action: ${errorText}`); + const errorMessage = await getErrorMessageFromResponse(saveRes, 'Failed to save action'); + throw new Error(errorMessage); } } diff --git a/application/single_app/static/js/chat/chat-model-selector.js b/application/single_app/static/js/chat/chat-model-selector.js index f5216d96..3c4c5daa 100644 --- a/application/single_app/static/js/chat/chat-model-selector.js +++ b/application/single_app/static/js/chat/chat-model-selector.js @@ -209,6 +209,23 @@ function getSelectionSnapshot() { }; } +function restoreLegacyPreferredModelSelection(preferredModelDeployment) { + if (!modelSelect || !preferredModelDeployment) { + return; + } + + const matchingOption = Array.from(modelSelect.options).find(option => ( + option.value === preferredModelDeployment + || option.dataset.deploymentName === preferredModelDeployment + )); + + if (!matchingOption || matchingOption.disabled) { + return; + } + + modelSelect.value = matchingOption.value; +} + function resolveSelectedSelectionKey(options, restoreOptions = {}) { const { currentSelection = null, @@ -486,6 +503,7 @@ export async function populateModelDropdown(restoreOptions = {}) { } if (!window.appSettings?.enable_multi_model_endpoints) { + restoreLegacyPreferredModelSelection(restoreOptions.preferredModelDeployment || null); modelSelectorController?.refresh(); return; } diff --git a/application/single_app/static/js/plugin_common.js b/application/single_app/static/js/plugin_common.js index 29a88a24..e50278d4 100644 --- a/application/single_app/static/js/plugin_common.js +++ b/application/single_app/static/js/plugin_common.js @@ -331,6 +331,20 @@ export async function validatePluginManifest(pluginManifest) { return await validatePluginManifestServerSide(pluginManifest); } +export async function getErrorMessageFromResponse(response, fallbackMessage = 'Request failed') { + const responseText = await response.text(); + if (!responseText) { + return fallbackMessage; + } + + try { + const errorData = JSON.parse(responseText); + return errorData.error || responseText; + } catch (error) { + return responseText; + } +} + // Server-side validation fallback async function validatePluginManifestServerSide(pluginManifest) { try { diff --git a/application/single_app/static/js/workspace/workspace_model_endpoints.js b/application/single_app/static/js/workspace/workspace_model_endpoints.js index 3f9e3337..612ea15d 100644 --- a/application/single_app/static/js/workspace/workspace_model_endpoints.js +++ b/application/single_app/static/js/workspace/workspace_model_endpoints.js @@ -53,6 +53,8 @@ const modelsListEl = document.getElementById("model-endpoint-models-list"); const addModelBtn = document.getElementById("model-endpoint-add-model-btn"); const scope = window.modelEndpointScope || "user"; +const endpointsContainerId = scope === "group" ? "group-multi-endpoint-configuration" : "workspace-multi-endpoint-configuration"; +const endpointsContainer = document.getElementById(endpointsContainerId); const endpointsApi = scope === "group" ? "/api/group/model-endpoints" : "/api/user/model-endpoints"; const modelsFetchApi = scope === "group" ? "/api/group/models/fetch" : "/api/user/models/fetch"; const modelsTestApi = scope === "group" ? "/api/group/models/test-model" : "/api/user/models/test-model"; @@ -60,6 +62,21 @@ const modelsTestApi = scope === "group" ? "/api/group/models/test-model" : "/api let workspaceEndpoints = Array.isArray(window.workspaceModelEndpoints) ? [...window.workspaceModelEndpoints] : []; let modalModels = []; +function hasEndpointManagementUi() { + return Boolean(endpointsWrapper && endpointsTbody); +} + +function hideEndpointManagementUi() { + if (endpointsContainer) { + endpointsContainer.classList.add("d-none"); + } +} + +function isEndpointsFeatureDisabled(error) { + const message = typeof error?.message === "string" ? error.message.toLowerCase() : ""; + return message.includes("custom endpoints") && message.includes("is disabled"); +} + function generateId() { if (window.crypto && window.crypto.randomUUID) { return window.crypto.randomUUID(); @@ -620,6 +637,10 @@ function escapeHtml(value) { } async function loadEndpoints() { + if (!hasEndpointManagementUi()) { + return; + } + try { const response = await fetch(endpointsApi); const payload = await response.json().catch(() => ({})); @@ -629,12 +650,24 @@ async function loadEndpoints() { workspaceEndpoints = Array.isArray(payload.endpoints) ? payload.endpoints : []; renderEndpoints(); } catch (error) { + if (isEndpointsFeatureDisabled(error)) { + console.info("[WorkspaceEndpoints] Endpoint management is disabled; skipping endpoint load."); + workspaceEndpoints = []; + renderEndpoints(); + hideEndpointManagementUi(); + return; + } + console.error("Failed to load endpoints", error); showToast(error.message || "Failed to load endpoints.", "danger"); } } function initialize() { + if (!hasEndpointManagementUi()) { + return; + } + if (enableMultiEndpointToggle) { enableMultiEndpointToggle.checked = Boolean(window.enableMultiModelEndpoints); } diff --git a/application/single_app/static/js/workspace/workspace_plugins.js b/application/single_app/static/js/workspace/workspace_plugins.js index 8ed4f6b5..67a7a966 100644 --- a/application/single_app/static/js/workspace/workspace_plugins.js +++ b/application/single_app/static/js/workspace/workspace_plugins.js @@ -1,5 +1,5 @@ // workspace_plugins.js (refactored to use plugin_common.js and new multi-step modal) -import { renderPluginsTable, renderPluginsGrid, ensurePluginsTableInRoot, validatePluginManifest } from '../plugin_common.js'; +import { renderPluginsTable, renderPluginsGrid, ensurePluginsTableInRoot, validatePluginManifest, getErrorMessageFromResponse } from '../plugin_common.js'; import { showToast } from "../chat/chat-toast.js" import { setupViewToggle, switchViewContainers, openViewModal @@ -136,9 +136,11 @@ function setupSaveHandler(plugin, modal) { const formData = window.pluginModalStepper.getFormData(); // Validate with JSON schema - const valid = await validatePluginManifest(formData); - if (!valid) { - window.pluginModalStepper.showError('Validation error: Invalid action data.'); + const validation = await validatePluginManifest(formData); + const validationFailed = validation === false || (validation && validation.valid === false); + if (validationFailed) { + const message = validation?.errors?.join('\n') || 'Validation error: Invalid action data.'; + window.pluginModalStepper.showError(message); return; } @@ -208,7 +210,8 @@ async function savePlugin(pluginData, existingPlugin = null) { }); if (!saveRes.ok) { - throw new Error('Failed to save action'); + const errorMessage = await getErrorMessageFromResponse(saveRes, 'Failed to save action'); + throw new Error(errorMessage); } } diff --git a/application/single_app/static/js/workspace/workspace_plugins_new.js b/application/single_app/static/js/workspace/workspace_plugins_new.js index 509e53e8..3a213ebb 100644 --- a/application/single_app/static/js/workspace/workspace_plugins_new.js +++ b/application/single_app/static/js/workspace/workspace_plugins_new.js @@ -1,5 +1,5 @@ // workspace_plugins.js (refactored to use plugin_common.js and new multi-step modal) -import { renderPluginsTable, ensurePluginsTableInRoot, validatePluginManifest } from '../plugin_common.js'; +import { renderPluginsTable, ensurePluginsTableInRoot, validatePluginManifest, getErrorMessageFromResponse } from '../plugin_common.js'; import { showToast } from "../chat/chat-toast.js" const root = document.getElementById('workspace-plugins-root'); @@ -68,9 +68,11 @@ function setupSaveHandler(plugin, modal) { const formData = window.pluginModalStepper.getFormData(); // Validate with JSON schema - const valid = await validatePluginManifest(formData); - if (!valid) { - window.pluginModalStepper.showError('Validation error: Invalid action data.'); + const validation = await validatePluginManifest(formData); + const validationFailed = validation === false || (validation && validation.valid === false); + if (validationFailed) { + const message = validation?.errors?.join('\n') || 'Validation error: Invalid action data.'; + window.pluginModalStepper.showError(message); return; } @@ -118,7 +120,8 @@ async function savePlugin(pluginData, existingPlugin = null) { }); if (!saveRes.ok) { - throw new Error('Failed to save action'); + const errorMessage = await getErrorMessageFromResponse(saveRes, 'Failed to save action'); + throw new Error(errorMessage); } } diff --git a/application/single_app/templates/_agent_modal.html b/application/single_app/templates/_agent_modal.html index a859aed9..ef7fefc5 100644 --- a/application/single_app/templates/_agent_modal.html +++ b/application/single_app/templates/_agent_modal.html @@ -1,5 +1,5 @@ -