Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions arcflow/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,73 @@ def _get_nontarget_agent_criteria(self, modified_since=0):

return criteria

def _validate_solr_query_part(self, query_part):
"""
Validate a Solr query part to prevent injection attacks.

This validation protects against both shell command injection and
Solr-specific query injection attacks. While the current code only uses
internally-controlled strings, this defense-in-depth approach prevents
future security issues if user-controlled data is added.

Args:
query_part (str): A single query part to validate

Returns:
bool: True if the query part appears safe, False otherwise
"""
if not isinstance(query_part, str):
return False

# Reject empty or whitespace-only strings
if not query_part.strip():
return False

# Check for suspicious patterns that could indicate injection attempts
# This includes both shell injection and Solr-specific patterns
suspicious_patterns = [
# Shell/command injection patterns
r';\s*\w', # Semicolon followed by word character
r'\$\{', # Variable interpolation attempts (e.g., Log4Shell)
r'`', # Command substitution
r'\|\|', # Shell command chaining (|| operator)
r'&&\s*[a-zA-Z]', # Shell command chaining (but allow Solr AND operator)

# Solr-specific injection patterns
r'\{!', # LocalParams injection attempts (e.g., {!type=xmlparser})
r'<script', # XSS attempts
r'javascript:', # JavaScript protocol handler
r'<\s*iframe', # iframe injection
]

for pattern in suspicious_patterns:
if re.search(pattern, query_part, re.IGNORECASE):
self.log.error(f"Rejected suspicious query part: {query_part}")
return False

return True

def _validate_solr_field_name(self, field_name):
"""
Validate a Solr field name to prevent injection attacks.

Args:
field_name (str): A field name to validate

Returns:
bool: True if the field name appears safe, False otherwise
"""
if not isinstance(field_name, str):
return False

# Field names should only contain alphanumeric characters, underscores, and hyphens
# This is stricter than Solr's actual requirements but provides better security
if not re.match(r'^[a-zA-Z0-9_-]+$', field_name):
self.log.error(f"Rejected invalid field name: {field_name}")
return False

return True

def _execute_solr_query(self, query_parts, solr_url=None, fields=['id'], indent_size=0):
"""
A generic function to execute a query against the Solr index.
Expand All @@ -765,6 +832,18 @@ def _execute_solr_query(self, query_parts, solr_url=None, fields=['id'], indent_
if not solr_url:
solr_url = self.solr_url

# Validate all query parts before constructing the query
for part in query_parts:
if not self._validate_solr_query_part(part):
self.log.error(f"Query validation failed for: {part}")
return []

# Validate all field names
for field in fields:
if not self._validate_solr_field_name(field):
self.log.error(f"Field name validation failed for: {field}")
return []

query_string = " AND ".join(query_parts)
self.log.info(f"{indent}Executing Solr query: {query_string}")

Expand Down