Fix declarative object parsing bug

This commit is contained in:
Peter Ibekwe
2026-06-10 14:13:52 -07:00
Unverified
parent 3753d938f5
commit 564259a4aa
3 changed files with 282 additions and 11 deletions
@@ -63,6 +63,9 @@ logger = logging.getLogger(__name__)
_ENV_REFERENCE_RE = re.compile(r"\bEnv\.([A-Za-z_][A-Za-z0-9_]*)")
# Allowed identifier shape for object-attribute steps in declarative state paths
# (matches PowerFx / Copilot Studio identifier rules).
_SAFE_PATH_SEGMENT_RE = re.compile(r"^[A-Za-z][A-Za-z0-9_]*$")
@dataclass(frozen=True)
class DeclarativeEnvConfig:
@@ -331,16 +334,21 @@ class DeclarativeWorkflowState:
def get(self, path: str, default: Any = None) -> Any:
"""Get a value from the state using a dot-notated path.
Dict-keyed segments may use arbitrary string keys (e.g. UUIDs in
``System.conversations.<id>.messages``). Segments that would resolve
via object-attribute access must be valid declarative identifiers
(``[A-Za-z][A-Za-z0-9_]*``); other shapes return ``default``.
Args:
path: Dot-notated path like 'Local.results' or 'Workflow.Inputs.query'
default: Default value if path doesn't exist
Returns:
The value at the path, or default if not found
The value at the path, or default if not found or unreachable.
"""
state_data = self.get_state_data()
parts = path.split(".")
if not parts:
if not parts or any(not p for p in parts):
return default
namespace = parts[0]
@@ -377,10 +385,19 @@ class DeclarativeWorkflowState:
obj = obj.get(part, default) # type: ignore[union-attr]
if obj is default:
return default
elif hasattr(obj, part): # type: ignore[arg-type]
obj = getattr(obj, part) # type: ignore[arg-type]
else:
return default
# Attribute access is only allowed for safe declarative identifiers.
if not _SAFE_PATH_SEGMENT_RE.match(part):
logger.warning(
"DeclarativeWorkflowState.get: rejecting attribute segment %r in path %r",
part,
path,
)
return default
if hasattr(obj, part): # type: ignore[arg-type]
obj = getattr(obj, part) # type: ignore[arg-type]
else:
return default
return obj # type: ignore[return-value]
@@ -392,7 +409,7 @@ class DeclarativeWorkflowState:
value: The value to set
Raises:
ValueError: If attempting to set Workflow.Inputs (which is read-only)
ValueError: If attempting to set Workflow.Inputs (which is read-only).
"""
state_data = self.get_state_data()
parts = path.split(".")
@@ -692,7 +709,7 @@ class DeclarativeWorkflowState:
if isinstance(replacement, str):
if len(replacement) > MAX_INLINE_LENGTH:
# Store long strings in a temp variable to avoid PowerFx expression limit
temp_var_name = f"_TempMessageText{temp_var_counter}"
temp_var_name = f"TempMessageText{temp_var_counter}"
temp_var_counter += 1
self.set(f"Local.{temp_var_name}", replacement)
replacement_str = f"Local.{temp_var_name}"
@@ -849,6 +866,9 @@ class DeclarativeWorkflowState:
def interpolate_string(self, text: str) -> str:
"""Interpolate {Variable.Path} references in a string.
Matched path segments must be valid declarative identifiers
(``[A-Za-z][A-Za-z0-9_]*``); other braced tokens are left as-is.
This handles template-style variable substitution like:
- "Created ticket #{Local.TicketParameters.TicketId}"
- "Routing to {Local.RoutingParameters.TeamName}"
@@ -866,8 +886,8 @@ class DeclarativeWorkflowState:
value = self.get(var_path)
return str(value) if value is not None else ""
# Match {Variable.Path} patterns
pattern = r"\{([A-Za-z][A-Za-z0-9_.]*)\}"
# Match {Variable.Path} patterns where each segment is a declarative identifier.
pattern = r"\{([A-Za-z][A-Za-z0-9_]*(?:\.[A-Za-z][A-Za-z0-9_]*)*)\}"
# Replace all matches
result = text
@@ -0,0 +1,251 @@
# Copyright (c) Microsoft. All rights reserved.
# pyright: reportUnknownParameterType=false, reportUnknownArgumentType=false
# pyright: reportMissingParameterType=false, reportUnknownMemberType=false
# pyright: reportPrivateUsage=false, reportUnknownVariableType=false
# pyright: reportGeneralTypeIssues=false
"""Path-segment validation tests for DeclarativeWorkflowState.
Path segments handed to ``get``/``set``/``append`` and ``{Variable.Path}``
placeholders in ``interpolate_string`` must be valid declarative identifiers
(``[A-Za-z][A-Za-z0-9_]*``). These tests pin that contract and the related
behavior across legitimate and invalid inputs.
"""
import logging
from dataclasses import dataclass
from typing import Any
from unittest.mock import MagicMock
import pytest
from agent_framework_declarative._workflows import DeclarativeWorkflowState
try:
import powerfx # noqa: F401
_powerfx_available = True
except (ImportError, RuntimeError):
_powerfx_available = False
_requires_powerfx = pytest.mark.skipif(not _powerfx_available, reason="PowerFx engine not available")
@pytest.fixture
def mock_state() -> MagicMock:
"""In-memory mock for the underlying State."""
ms = MagicMock()
ms._data = {}
def get(key: str, default: Any = None) -> Any:
return ms._data.get(key, default)
def set_(key: str, value: Any) -> None:
ms._data[key] = value
def has(key: str) -> bool:
return key in ms._data
def delete(key: str) -> None:
ms._data.pop(key, None)
ms.get = MagicMock(side_effect=get)
ms.set = MagicMock(side_effect=set_)
ms.has = MagicMock(side_effect=has)
ms.delete = MagicMock(side_effect=delete)
return ms
@pytest.fixture
def state(mock_state: MagicMock) -> DeclarativeWorkflowState:
s = DeclarativeWorkflowState(mock_state)
s.initialize()
return s
@dataclass
class _PlainObj:
"""Non-dict object so ``get`` falls through to attribute access."""
text: str = "hi"
# ---------------------------------------------------------------------------
# get(): invalid paths return default
# ---------------------------------------------------------------------------
class TestGetRejectsInvalidPaths:
def test_rejects_dunder_segment_via_attribute_access(self, state: DeclarativeWorkflowState) -> None:
state.set("Local.obj", _PlainObj())
assert state.get("Local.obj.__class__") is None
assert state.get("Local.obj.__class__", default="DEF") == "DEF"
def test_rejects_full_env_exfil_chain(self, state: DeclarativeWorkflowState, monkeypatch) -> None:
sentinel = "agent-framework-path-safety-sentinel"
monkeypatch.setenv("AF_PATH_SAFETY_SENTINEL", sentinel)
state.set("Local.obj", _PlainObj())
result = state.get("Local.obj.__class__.__init__.__globals__.os.environ")
assert result is None
assert sentinel not in str(result)
def test_rejects_leading_underscore_via_attribute_access(self, state: DeclarativeWorkflowState) -> None:
state.set("Local.obj", _PlainObj())
assert state.get("Local.obj._private") is None
def test_rejects_invalid_chars_via_attribute_access(self, state: DeclarativeWorkflowState) -> None:
state.set("Local.obj", _PlainObj())
assert state.get("Local.obj.text bar") is None
assert state.get("Local.obj.text-bar") is None
def test_rejects_empty_path_and_empty_segments(self, state: DeclarativeWorkflowState) -> None:
assert state.get("") is None
assert state.get(".") is None
assert state.get("Local.") is None
assert state.get(".Local") is None
def test_warning_logged_on_rejected_attribute_segment(
self,
state: DeclarativeWorkflowState,
caplog: pytest.LogCaptureFixture,
) -> None:
state.set("Local.obj", _PlainObj())
with caplog.at_level(logging.WARNING, logger="agent_framework_declarative._workflows._declarative_base"):
state.get("Local.obj.__class__")
assert any("rejecting attribute segment" in r.message for r in caplog.records)
def test_dict_keyed_dunder_is_not_attribute_access(self, state: DeclarativeWorkflowState) -> None:
"""A literal dunder dict key is harmless because dict lookup never reaches getattr."""
state.set("Local.bag", {"__class__": "harmless-string"})
assert state.get("Local.bag.__class__") == "harmless-string"
# ---------------------------------------------------------------------------
# get(): legitimate paths continue to work
# ---------------------------------------------------------------------------
class TestGetAllowsValidPaths:
def test_underscore_inside_identifier(self, state: DeclarativeWorkflowState) -> None:
state.set("Local.user_input", "ok")
assert state.get("Local.user_input") == "ok"
def test_mixed_case_identifiers(self, state: DeclarativeWorkflowState) -> None:
state.set("Local.UserInput", "u1")
state.set("Local.userInput", "u2")
assert state.get("Local.UserInput") == "u1"
assert state.get("Local.userInput") == "u2"
def test_object_attribute_traversal_still_works(self, state: DeclarativeWorkflowState) -> None:
state.set("Local.msg", _PlainObj(text="hello"))
assert state.get("Local.msg.text") == "hello"
def test_nested_dict_traversal_still_works(self, state: DeclarativeWorkflowState) -> None:
state.set("Local.params", {"team": {"name": "alpha"}})
assert state.get("Local.params.team.name") == "alpha"
def test_uuid_and_hyphenated_dict_keys_are_allowed(self, state: DeclarativeWorkflowState) -> None:
"""Conversation-id style paths use arbitrary dict keys (UUIDs / hyphens)."""
conv_id = "eb815014-06f1-4db6-b7c1-304ea135424f"
state.set(f"System.conversations.{conv_id}.messages", ["m1", "m2"])
assert state.get(f"System.conversations.{conv_id}.messages") == ["m1", "m2"]
# ---------------------------------------------------------------------------
# set() / append(): dict-keyed operations accept arbitrary string keys
# ---------------------------------------------------------------------------
class TestSetAndAppend:
def test_set_allows_underscore_inside_identifier(self, state: DeclarativeWorkflowState) -> None:
state.set("Local.user_input", "ok")
assert state.get("Local.user_input") == "ok"
def test_set_allows_uuid_and_hyphenated_dict_keys(self, state: DeclarativeWorkflowState) -> None:
conv_id = "conv-test-1"
state.set(f"System.conversations.{conv_id}.messages", [])
assert state.get(f"System.conversations.{conv_id}.messages") == []
def test_append_allows_uuid_and_hyphenated_dict_keys(self, state: DeclarativeWorkflowState) -> None:
conv_id = "conv-42"
state.append(f"System.conversations.{conv_id}.messages", {"role": "user", "text": "hi"})
msgs = state.get(f"System.conversations.{conv_id}.messages")
assert msgs == [{"role": "user", "text": "hi"}]
def test_workflow_inputs_still_read_only(self, state: DeclarativeWorkflowState) -> None:
with pytest.raises(ValueError, match="read-only"):
state.set("Workflow.Inputs.x", 1)
# ---------------------------------------------------------------------------
# interpolate_string(): invalid placeholders left intact, valid ones resolved
# ---------------------------------------------------------------------------
class TestInterpolateString:
def test_ignores_dunder_payload(self, state: DeclarativeWorkflowState, monkeypatch) -> None:
sentinel = "agent-framework-interp-sentinel"
monkeypatch.setenv("AF_INTERP_SENTINEL", sentinel)
state.set("Local.obj", _PlainObj())
out = state.interpolate_string("X={Local.obj.__class__.__init__.__globals__.os.environ}")
assert sentinel not in out
assert "{Local.obj.__class__" in out # placeholder left as literal text
def test_ignores_leading_underscore_segment(self, state: DeclarativeWorkflowState) -> None:
out = state.interpolate_string("v={Local._private}")
assert out == "v={Local._private}"
def test_allows_underscore_inside_identifier(self, state: DeclarativeWorkflowState) -> None:
state.set("Local.user_input", "hello")
assert state.interpolate_string("v={Local.user_input}") == "v=hello"
def test_resolves_nested_dict_path(self, state: DeclarativeWorkflowState) -> None:
state.set("Local.params", {"team": "alpha"})
assert state.interpolate_string("team={Local.params.team}") == "team=alpha"
def test_end_to_end_send_activity_literal_placeholder(
self,
state: DeclarativeWorkflowState,
monkeypatch,
) -> None:
"""Mirror the SendActivity flow: eval_if_expression then interpolate_string."""
sentinel = "agent-framework-e2e-sentinel"
monkeypatch.setenv("AF_E2E_SENTINEL", sentinel)
state.set("Local.toolResult", _PlainObj())
payload = "{Local.toolResult.__class__.__init__.__globals__.os.environ}"
evaluated = state.eval_if_expression(payload)
rendered = state.interpolate_string(evaluated) if isinstance(evaluated, str) else str(evaluated)
assert rendered == payload
assert sentinel not in rendered
# ---------------------------------------------------------------------------
# Regressions: PowerFx and internal temp-variable handling still work
# ---------------------------------------------------------------------------
@_requires_powerfx
class TestPowerFxStillWorks:
def test_simple_powerfx_expression_evaluates(self, state: DeclarativeWorkflowState) -> None:
state.set("Local.x", 6)
state.set("Local.y", 7)
assert state.eval("=Local.x * Local.y") == 42
def test_internal_temp_message_text_still_works(self, state: DeclarativeWorkflowState) -> None:
"""Long MessageText() results stored in TempMessageText{n} still round-trip."""
long_text = "A" * 600
state.set(
"Local.Messages",
[{"text": long_text, "contents": [{"type": "text", "text": long_text}]}],
)
result = state.eval("=Upper(MessageText(Local.Messages))")
assert result == "A" * 600
assert state.get("Local.TempMessageText0") == long_text
@@ -2761,7 +2761,7 @@ class TestLongMessageTextHandling:
assert result == "HELLO WORLD"
# No temp variable should be created for short strings
temp_var = state.get("Local._TempMessageText0")
temp_var = state.get("Local.TempMessageText0")
assert temp_var is None
async def test_long_message_text_stored_in_temp_variable(self, mock_state):
@@ -2778,7 +2778,7 @@ class TestLongMessageTextHandling:
assert result == "A" * 600 # Upper on 'A' is still 'A'
# A temp variable should have been created
temp_var = state.get("Local._TempMessageText0")
temp_var = state.get("Local.TempMessageText0")
assert temp_var == long_text
async def test_find_with_long_message_text(self, mock_state):