Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion sigma/conditions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from dataclasses import dataclass, field
from abc import ABC
import copy
from functools import lru_cache
import re
from sigma.processing.tracking import ProcessingItemTrackingMixin
from pyparsing import (
Expand All @@ -12,11 +14,15 @@
opAssoc,
ParseResults,
ParseException,
ParserElement,
)
from typing import ClassVar, Type, cast, TYPE_CHECKING
from sigma.types import SigmaType
from sigma.exceptions import SigmaConditionError, SigmaRuleLocation

# Enable packrat parsing for faster parsing of complex condition expressions
ParserElement.enable_packrat(cache_size_limit=128)

if TYPE_CHECKING:
from sigma.rule.detection import SigmaDetection, SigmaDetectionItem, SigmaDetections

Expand Down Expand Up @@ -282,6 +288,17 @@ class ConditionValueExpression(ParentChainMixin):
)


@lru_cache(maxsize=256)
def _parse_condition_string(
condition_str: str,
) -> ConditionItem:
"""Parse a condition string using pyparsing, with caching for repeated strings.

Callers must deep-copy the returned result since postprocessing mutates the parse tree.
"""
return cast(ConditionItem, condition.parse_string(condition_str, parse_all=True)[0])


@dataclass
class SigmaCondition(ProcessingItemTrackingMixin):
condition: str
Expand All @@ -304,7 +321,8 @@ def parse(
"The pipe syntax in Sigma conditions has been deprecated and replaced by Sigma correlations. pySigma doesn't supports this syntax."
)
try:
parsed = cast(ConditionItem, condition.parse_string(self.condition, parse_all=True)[0])
# Use cached parse result, deep-copied since postprocessing mutates the tree
parsed = copy.deepcopy(_parse_condition_string(self.condition))
if postprocess:
return parsed.postprocess(self.detections, source=self.source)
else:
Expand Down
15 changes: 14 additions & 1 deletion sigma/modifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,22 @@ def __init__(
self.applied_modifiers = applied_modifiers
self.source = source

# Cache for type hints resolved from modify() method, keyed by class
_type_hint_cache: ClassVar[dict[type, Any]] = {}

def _get_modify_type_hint(self) -> Any:
"""Get the type hint for the 'val' parameter of the modify method, with caching per class."""
cls = type(self)
try:
return SigmaModifier._type_hint_cache[cls]
except KeyError:
th = get_type_hints(self.modify)["val"]
SigmaModifier._type_hint_cache[cls] = th
return th

def type_check(self, val: Any, explicit_type: Type[Any] | None = None) -> bool:
th = (
explicit_type or get_type_hints(self.modify)["val"]
explicit_type or self._get_modify_type_hint()
) # get type annotation from val parameter of apply method or explicit_type parameter
if th is Any:
return True
Expand Down
81 changes: 49 additions & 32 deletions sigma/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,20 +338,14 @@ def replace_with_placeholder(

def _merge_strs(self) -> "SigmaString":
"""Merge consecutive plain strings in self.s."""
src = list(reversed(self.s))
res: list[SigmaStringPartType] = []
while src:
item = src.pop()
try:
if isinstance(res[-1], str) and isinstance(
item, str
): # append current item to last result element if both are strings
res[-1] += item
else:
res.append(item)
except IndexError: # first element
if not self.s:
return self
res: list[SigmaStringPartType] = [self.s[0]]
for item in self.s[1:]:
if isinstance(res[-1], str) and isinstance(item, str):
res[-1] += item
else:
res.append(item)

self.s = res
return self

Expand Down Expand Up @@ -453,7 +447,7 @@ def endswith(self, val: SigmaStringPartType) -> bool:

def contains_special(self) -> bool:
"""Check if string contains special characters."""
return any([isinstance(item, SpecialChars) for item in self.s])
return any(isinstance(item, SpecialChars) for item in self.s)

def contains_placeholder(
self, include: list[str] | None = None, exclude: list[str] | None = None
Expand Down Expand Up @@ -575,40 +569,56 @@ def convert(
Setting one of the wildcard or multiple parameters to None indicates that this feature is not supported. Appearance
of these characters in a string will raise a SigmaValueError.
"""
s = ""
result = []
escaped_chars = frozenset((wildcard_multi or "") + (wildcard_single or "") + add_escaped)

for c in iter(self):
if isinstance(c, str): # c is plain character
if c in filter_chars: # Skip filtered characters
continue
if c in escaped_chars:
s += escape_char
s += c
elif isinstance(c, SpecialChars): # special handling for special characters
if c == SpecialChars.WILDCARD_MULTI:
filter_set = frozenset(filter_chars)

for part in self.s:
if isinstance(part, str): # part is a plain string segment
if not filter_set and not escaped_chars:
# Fast path: no escaping or filtering needed
result.append(part)
elif not filter_set and escaped_chars:
# Only escaping needed, process character-by-character only if necessary
if any(c in escaped_chars for c in part):
for c in part:
if c in escaped_chars and escape_char is not None:
result.append(escape_char)
result.append(c)
else:
result.append(part)
else:
# Both filtering and escaping
for c in part:
if c in filter_set:
continue
if c in escaped_chars and escape_char is not None:
result.append(escape_char)
result.append(c)
elif isinstance(part, SpecialChars): # special handling for special characters
if part == SpecialChars.WILDCARD_MULTI:
if wildcard_multi is not None:
s += wildcard_multi
result.append(wildcard_multi)
else:
raise SigmaValueError(
"Multi-character wildcard not specified for conversion"
)
elif c == SpecialChars.WILDCARD_SINGLE:
elif part == SpecialChars.WILDCARD_SINGLE:
if wildcard_single is not None:
s += wildcard_single
result.append(wildcard_single)
else:
raise SigmaValueError(
"Single-character wildcard not specified for conversion"
)
elif isinstance(c, Placeholder):
elif isinstance(part, Placeholder):
raise SigmaPlaceholderError(
f"Attempt to convert unhandled placeholder '{c.name}' into query."
f"Attempt to convert unhandled placeholder '{part.name}' into query."
)
else:
raise SigmaValueError(
f"Trying to convert SigmaString containing part of type '{type(c).__name__}'"
f"Trying to convert SigmaString containing part of type '{type(part).__name__}'"
)
return s
return "".join(result)

def to_regex(self, custom_escaped: str = "") -> "SigmaRegularExpression":
"""Convert SigmaString into a regular expression."""
Expand Down Expand Up @@ -1014,6 +1024,13 @@ class SigmaExpansion(NoPlainConversionMixin, SigmaType):

def sigma_type(v: (int | float | str | bool) | None) -> SigmaType:
"""Return Sigma type from Python value"""
# Check bool before int since bool is a subclass of int in Python
if isinstance(v, bool):
return SigmaBool(v)
matched = type_map.get(type(v))
if matched is not None:
return matched(v)
# Fallback to isinstance checks for subclasses
for t, st in type_map.items():
if isinstance(v, t):
return st(v)
Expand Down
Loading