Purview: Parallelize PSPC cold-cache scope refresh (#5832)

* Parallelize Purview PSPC cold cache path

* Cache Purview payment-required state for scope refresh

* Cache Purview payment-required state for scope refresh

* Align Purview policy action dedupe and 402 caching

 Deduplicate combined policy actions by action and restriction action so restriction-only actions are preserved
without duplicating identical entries. Cache tenant-level payment-required state from background scope refresh so
subsequent calls short-circuit consistently.

* .NET: Implement best-effort caching for background job scope retrieval and add unit tests for cache write failures

* Purview - feat: Enhance ScopedContentProcessor to queue ContentActivityJob when no applicable scopes are found and update related tests

* docs: Update purview package README and AGENTS documentation to reflect caching optimizations and policy enforcement scenarios

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Taisir Hassan
2026-06-09 11:01:21 -07:00
committed by GitHub
Unverified
parent 2a345e5d3b
commit 383d551b86
16 changed files with 917 additions and 228 deletions
+1
View File
@@ -320,4 +320,5 @@ except (PurviewAuthenticationError, PurviewRateLimitError, PurviewRequestError,
- **Streaming Responses**: Post-response policy evaluation presently applies only to non-streaming chat responses.
- **Error Handling**: Use `ignore_exceptions` and `ignore_payment_required` settings for graceful degradation. When enabled, errors are logged but don't fail the request.
- **Caching**: Protection scopes responses and 402 errors are cached by default with a 4-hour TTL. Cache is automatically invalidated when protection scope state changes.
- **Cold-cache parallelization**: On a `ProtectionScopes` cache miss, scopes are refreshed in the background while `ProcessContent` runs in the foreground.
- **Background Processing**: Content Activities and offline Process Content requests are handled asynchronously using background tasks to avoid blocking the main execution flow.
@@ -231,18 +231,19 @@ class ScopedContentProcessor:
cached_ps_resp = await self._cache.get(cache_key)
if cached_ps_resp is not None and isinstance(cached_ps_resp, ProtectionScopesResponse):
ps_resp = cached_ps_resp
else:
ttl = self._settings.get("cache_ttl_seconds")
ttl_seconds = ttl if ttl is not None else 14400
try:
ps_resp = await self._client.get_protection_scopes(ps_req)
await self._cache.set(cache_key, ps_resp, ttl_seconds=ttl_seconds)
except PurviewPaymentRequiredError as ex:
# Cache the exception at tenant level so all subsequent requests for this tenant fail fast
await self._cache.set(tenant_payment_cache_key, ex, ttl_seconds=ttl_seconds)
raise
return await self._process_with_cached_scopes(pc_request, cached_ps_resp, cache_key)
task = asyncio.create_task(self._refresh_protection_scopes_background(ps_req, cache_key, pc_request))
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)
return await self._call_process_content(pc_request, cache_key, dlp_actions=[])
async def _process_with_cached_scopes(
self,
pc_request: ProcessContentRequest,
ps_resp: ProtectionScopesResponse,
cache_key: str,
) -> ProcessContentResponse:
if ps_resp.scope_identifier:
pc_request.scope_identifier = ps_resp.scope_identifier
@@ -259,13 +260,7 @@ class ScopedContentProcessor:
task.add_done_callback(self._background_tasks.discard)
return ProcessContentResponse(id="204", correlation_id=pc_request.correlation_id)
pc_resp = await self._client.process_content(pc_request)
if pc_request.scope_identifier and pc_resp.protection_scope_state == ProtectionScopeState.MODIFIED:
await self._cache.remove(cache_key)
pc_resp.policy_actions = self._combine_policy_actions(pc_resp.policy_actions, dlp_actions)
return pc_resp
return await self._call_process_content(pc_request, cache_key, dlp_actions=dlp_actions)
# No applicable scopes - send content activities in background
ca_req = ContentActivitiesRequest(
@@ -281,12 +276,52 @@ class ScopedContentProcessor:
# Respond with HttpStatusCode 204(No Content)
return ProcessContentResponse(id="204", correlation_id=pc_request.correlation_id)
async def _call_process_content(
self,
pc_request: ProcessContentRequest,
cache_key: str,
dlp_actions: list[DlpActionInfo],
) -> ProcessContentResponse:
pc_resp = await self._client.process_content(pc_request)
if pc_request.scope_identifier and pc_resp.protection_scope_state == ProtectionScopeState.MODIFIED:
await self._cache.remove(cache_key)
if dlp_actions:
pc_resp.policy_actions = self._combine_policy_actions(pc_resp.policy_actions, dlp_actions)
return pc_resp
async def _refresh_protection_scopes_background(
self, ps_req: ProtectionScopesRequest, cache_key: str, pc_request: ProcessContentRequest
) -> None:
"""Fetch protection scopes and warm the cache without blocking the foreground call."""
ttl = self._settings.get("cache_ttl_seconds")
ttl_seconds = ttl if ttl is not None else 14400
try:
ps_resp = await self._client.get_protection_scopes(ps_req)
await self._cache.set(cache_key, ps_resp, ttl_seconds=ttl_seconds)
should_process, _, _ = self._check_applicable_scopes(pc_request, ps_resp)
if not should_process:
ca_req = ContentActivitiesRequest(
user_id=pc_request.user_id,
tenant_id=pc_request.tenant_id,
content_to_process=pc_request.content_to_process,
correlation_id=pc_request.correlation_id,
)
await self._send_content_activities_background(ca_req)
except PurviewPaymentRequiredError as ex:
tenant_payment_cache_key = f"purview:payment_required:{ps_req.tenant_id}"
await self._cache.set(tenant_payment_cache_key, ex, ttl_seconds=ttl_seconds)
logger.warning("Background protection scopes refresh failed with payment required: %s", ex)
except Exception as ex:
logger.warning("Background protection scopes refresh failed: %s", ex)
async def _process_content_background(self, pc_request: ProcessContentRequest, cache_key: str) -> None:
"""Process content in background for offline execution mode."""
try:
pc_resp = await self._client.process_content(pc_request)
# If protection scope state is modified, make another PC request and invalidate cache
# If protection scopes changed, invalidate cache and retry once.
if pc_request.scope_identifier and pc_resp.protection_scope_state == ProtectionScopeState.MODIFIED:
await self._cache.remove(cache_key)
await self._client.process_content(pc_request)
@@ -306,14 +341,10 @@ class ScopedContentProcessor:
def _combine_policy_actions(
existing: list[DlpActionInfo] | None, new_actions: list[DlpActionInfo]
) -> list[DlpActionInfo]:
by_key: dict[str, DlpActionInfo] = {}
for a in existing or []:
if a.action:
by_key[a.action] = a
for a in new_actions:
if a.action:
by_key[a.action] = a
return list(by_key.values())
combined: dict[tuple[DlpAction | None, RestrictionAction | None], DlpActionInfo] = {}
for action_info in (existing or []) + new_actions:
combined.setdefault((action_info.action, action_info.restriction_action), action_info)
return list(combined.values())
@staticmethod
def _check_applicable_scopes(
@@ -2,6 +2,7 @@
"""Tests for Purview processor."""
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
@@ -217,10 +218,38 @@ class TestScopedContentProcessor:
assert action1 in combined
assert action2 in combined
async def test_combine_policy_actions_preserves_restriction_only_actions(
self, processor: ScopedContentProcessor
) -> None:
"""Test _combine_policy_actions keeps actions that only set restrictionAction."""
existing_action = DlpActionInfo(action=DlpAction.OTHER, restrictionAction=RestrictionAction.OTHER)
restriction_only_action = DlpActionInfo(restriction_action=RestrictionAction.BLOCK)
combined = processor._combine_policy_actions([existing_action], [restriction_only_action])
assert combined == [existing_action, restriction_only_action]
async def test_combine_policy_actions_deduplicates_by_action_and_restriction(
self, processor: ScopedContentProcessor
) -> None:
"""Test _combine_policy_actions removes exact duplicate actions."""
block_action = DlpActionInfo(action=DlpAction.BLOCK_ACCESS, restriction_action=RestrictionAction.BLOCK)
duplicate_block_action = DlpActionInfo(
action=DlpAction.BLOCK_ACCESS, restriction_action=RestrictionAction.BLOCK
)
restriction_only_action = DlpActionInfo(restriction_action=RestrictionAction.BLOCK)
combined = processor._combine_policy_actions(
[block_action],
[duplicate_block_action, restriction_only_action],
)
assert combined == [block_action, restriction_only_action]
async def test_process_with_scopes_calls_client_methods(
self, processor: ScopedContentProcessor, mock_client: AsyncMock, process_content_request_factory
) -> None:
"""Test _process_with_scopes calls get_protection_scopes when scopes response is empty."""
"""Test _process_with_scopes calls process_content immediately and warms scopes in background on cache miss."""
from agent_framework_purview._models import (
ContentActivitiesResponse,
ProtectionScopesResponse,
@@ -236,38 +265,91 @@ class TestScopedContentProcessor:
response = await processor._process_with_scopes(request)
mock_client.get_protection_scopes.assert_called_once()
# When no scopes apply, process_content is not called (activities are sent in background)
mock_client.process_content.assert_not_called()
# The response should have id=204 (No Content) when no scopes apply
assert response.id == "204"
# On cache miss, ProcessContent runs in the foreground and the response is returned.
assert response.id == "response-123"
mock_client.process_content.assert_called_once()
async def test_process_with_scopes_ignores_unexpected_cached_value_type(
# Protection scopes are refreshed in a background task.
await asyncio.gather(*list(processor._background_tasks))
mock_client.get_protection_scopes.assert_called_once()
mock_client.send_content_activities.assert_called_once()
async def test_process_with_scopes_preserves_restriction_only_policy_actions(
self, processor: ScopedContentProcessor, mock_client: AsyncMock, process_content_request_factory
) -> None:
"""Test that a corrupted cache entry does not crash processing."""
"""Test cold-cache ProcessContent actions are not dropped when they only contain restrictionAction."""
from agent_framework_purview._models import ProtectionScopesResponse
request = process_content_request_factory()
restriction_only_action = DlpActionInfo(restriction_action=RestrictionAction.BLOCK)
mock_client.get_protection_scopes = AsyncMock(return_value=ProtectionScopesResponse(**{"value": []}))
mock_client.process_content = AsyncMock(
return_value=ProcessContentResponse(
id="response-123",
protection_scope_state="notModified",
policy_actions=[restriction_only_action],
)
)
response = await processor._process_with_scopes(request)
assert response.policy_actions == [restriction_only_action]
await asyncio.gather(*list(processor._background_tasks))
async def test_process_with_cached_scopes_preserves_restriction_only_policy_actions(
self, processor: ScopedContentProcessor, mock_client: AsyncMock, process_content_request_factory
) -> None:
"""Test cached ProtectionScopes actions are not dropped when they only contain restrictionAction."""
from agent_framework_purview._models import (
ExecutionMode,
PolicyLocation,
PolicyScope,
ProcessContentResponse,
ProtectionScopeActivities,
ProtectionScopesResponse,
)
request = process_content_request_factory()
restriction_only_action = DlpActionInfo(restriction_action=RestrictionAction.BLOCK)
process_content_action = DlpActionInfo(action=DlpAction.OTHER, restriction_action=RestrictionAction.OTHER)
scope_location = PolicyLocation(
data_type="microsoft.graph.policyLocationApplication",
value="app-id",
)
scope = PolicyScope(
activities=ProtectionScopeActivities.UPLOAD_TEXT,
locations=[scope_location],
policy_actions=[restriction_only_action],
execution_mode=ExecutionMode.EVALUATE_INLINE,
)
# Return a valid, inline scope so we stay on the normal (non-background) path.
scope_location = PolicyLocation(**{
"@odata.type": "microsoft.graph.policyLocationApplication",
"value": "app-id",
})
scope = PolicyScope(**{
"activities": ProtectionScopeActivities.UPLOAD_TEXT,
"locations": [scope_location],
"execution_mode": ExecutionMode.EVALUATE_INLINE,
})
mock_client.get_protection_scopes = AsyncMock(return_value=ProtectionScopesResponse(**{"value": [scope]}))
processor._cache.get = AsyncMock(
side_effect=[
None,
ProtectionScopesResponse(scope_identifier="scope-123", scopes=[scope]),
]
) # type: ignore[method-assign]
mock_client.process_content = AsyncMock(
return_value=ProcessContentResponse(
id="response-123",
protection_scope_state="notModified",
policy_actions=[process_content_action],
)
)
response = await processor._process_with_scopes(request)
assert response.policy_actions == [process_content_action, restriction_only_action]
async def test_process_with_scopes_ignores_unexpected_cached_value_type(
self, processor: ScopedContentProcessor, mock_client: AsyncMock, process_content_request_factory
) -> None:
"""Test that a corrupted cache entry does not crash processing."""
from agent_framework_purview._models import ProtectionScopesResponse
request = process_content_request_factory()
mock_client.get_protection_scopes = AsyncMock(return_value=ProtectionScopesResponse(**{"value": []}))
mock_client.process_content = AsyncMock(
return_value=ProcessContentResponse(**{"id": "ok", "protectionScopeState": "notModified"})
)
@@ -279,8 +361,9 @@ class TestScopedContentProcessor:
response = await processor._process_with_scopes(request)
assert response.id == "ok"
mock_client.get_protection_scopes.assert_called_once()
mock_client.process_content.assert_called_once()
await asyncio.gather(*list(processor._background_tasks))
mock_client.get_protection_scopes.assert_called_once()
async def test_process_with_scopes_uses_tenant_payment_exception_cache(
self, processor: ScopedContentProcessor, mock_client: AsyncMock, process_content_request_factory
@@ -301,8 +384,6 @@ class TestScopedContentProcessor:
self, processor: ScopedContentProcessor, mock_client: AsyncMock, process_content_request_factory
) -> None:
"""Test offline background processing invalidates cache and retries when scope state changes."""
from agent_framework_purview._models import ProcessContentResponse
request = process_content_request_factory()
request.scope_identifier = "etag-1"
@@ -319,6 +400,36 @@ class TestScopedContentProcessor:
processor._cache.remove.assert_called_once_with("purview:protection_scopes:abc")
assert mock_client.process_content.call_count == 2
async def test_background_scope_refresh_caches_payment_required(
self, mock_client: AsyncMock, process_content_request_factory
) -> None:
"""402 raised during background scope refresh is cached at the tenant level."""
from agent_framework_purview._cache import InMemoryCacheProvider
from agent_framework_purview._exceptions import PurviewPaymentRequiredError
settings = PurviewSettings(
app_name="Test App",
tenant_id="12345678-1234-1234-1234-123456789012",
purview_app_location=PurviewAppLocation(
location_type=PurviewLocationType.APPLICATION, location_value="app-id"
),
)
cache = InMemoryCacheProvider()
processor = ScopedContentProcessor(mock_client, settings, cache_provider=cache)
mock_client.get_protection_scopes = AsyncMock(side_effect=PurviewPaymentRequiredError("nope"))
mock_client.process_content = AsyncMock(
return_value=ProcessContentResponse(**{"id": "pc-1", "protectionScopeState": "notModified"})
)
request = process_content_request_factory()
await processor._process_with_scopes(request)
await asyncio.gather(*list(processor._background_tasks))
cached = await cache.get(f"purview:payment_required:{request.tenant_id}")
assert isinstance(cached, PurviewPaymentRequiredError)
async def test_map_messages_with_user_id_in_additional_properties(self, mock_client: AsyncMock) -> None:
"""Test user_id extraction from message additional_properties."""
settings = PurviewSettings(
@@ -387,6 +498,8 @@ class TestScopedContentProcessor:
self, mock_client: AsyncMock, process_content_request_factory
) -> None:
"""Test that response is returned when scopes don't apply (activities sent in background)."""
from agent_framework_purview._models import ProtectionScopesResponse
settings = PurviewSettings(
app_name="Test App",
tenant_id="12345678-1234-1234-1234-123456789012",
@@ -398,10 +511,8 @@ class TestScopedContentProcessor:
pc_request = process_content_request_factory()
# Mock get_protection_scopes to return no applicable scopes
mock_ps_response = MagicMock()
mock_ps_response.scopes = []
mock_client.get_protection_scopes.return_value = mock_ps_response
mock_ps_response = ProtectionScopesResponse(scopes=[])
processor._cache.get = AsyncMock(side_effect=[None, mock_ps_response]) # type: ignore[method-assign]
# Mock send_content_activities to return success (called in background)
mock_ca_response = MagicMock()
@@ -410,8 +521,10 @@ class TestScopedContentProcessor:
response = await processor._process_with_scopes(pc_request)
mock_client.get_protection_scopes.assert_called_once()
mock_client.get_protection_scopes.assert_not_called()
mock_client.process_content.assert_not_called()
await asyncio.gather(*list(processor._background_tasks))
mock_client.send_content_activities.assert_called_once()
# Response should have id=204 when no scopes apply
assert response.id == "204"
@@ -419,6 +532,8 @@ class TestScopedContentProcessor:
self, mock_client: AsyncMock, process_content_request_factory
) -> None:
"""Test that errors in background activities don't affect the response."""
from agent_framework_purview._models import ProtectionScopesResponse
settings = PurviewSettings(
app_name="Test App",
tenant_id="12345678-1234-1234-1234-123456789012",
@@ -430,10 +545,8 @@ class TestScopedContentProcessor:
pc_request = process_content_request_factory()
# Mock get_protection_scopes to return no applicable scopes
mock_ps_response = MagicMock()
mock_ps_response.scopes = []
mock_client.get_protection_scopes.return_value = mock_ps_response
mock_ps_response = ProtectionScopesResponse(scopes=[])
processor._cache.get = AsyncMock(side_effect=[None, mock_ps_response]) # type: ignore[method-assign]
# Mock send_content_activities to return error (called in background task)
mock_ca_response = MagicMock()
@@ -445,6 +558,8 @@ class TestScopedContentProcessor:
# Since activities are sent in background, errors don't affect the response
# Response should have id=204 when no scopes apply
assert response.id == "204"
await asyncio.gather(*list(processor._background_tasks))
mock_client.send_content_activities.assert_called_once()
class TestUserIdResolution:
@@ -656,10 +771,12 @@ class TestScopedContentProcessorCaching:
mock_client.get_protection_scopes.return_value = ProtectionScopesResponse(
scope_identifier="scope-123", scopes=[]
)
mock_client.process_content.return_value = ProcessContentResponse(id="ok", protection_scope_state="notModified")
messages = [Message(role="user", contents=["Test"])]
await processor.process_messages(messages, Activity.UPLOAD_TEXT, user_id="12345678-1234-1234-1234-123456789012")
await asyncio.gather(*list(processor._background_tasks))
mock_client.get_protection_scopes.assert_called_once()
@@ -670,7 +787,7 @@ class TestScopedContentProcessorCaching:
async def test_payment_required_exception_cached_at_tenant_level(
self, mock_client: AsyncMock, settings: PurviewSettings
) -> None:
"""Test that 402 payment required exceptions are cached at tenant level."""
"""Test that background scope 402 returns once, then throws from the tenant-level cache."""
from agent_framework_purview._cache import InMemoryCacheProvider
from agent_framework_purview._exceptions import PurviewPaymentRequiredError
@@ -678,13 +795,12 @@ class TestScopedContentProcessorCaching:
processor = ScopedContentProcessor(mock_client, settings, cache_provider=cache_provider)
mock_client.get_protection_scopes.side_effect = PurviewPaymentRequiredError("Payment required")
mock_client.process_content.return_value = ProcessContentResponse(id="ok", protection_scope_state="notModified")
messages = [Message(role="user", contents=["Test"])]
with pytest.raises(PurviewPaymentRequiredError):
await processor.process_messages(
messages, Activity.UPLOAD_TEXT, user_id="12345678-1234-1234-1234-123456789012"
)
await processor.process_messages(messages, Activity.UPLOAD_TEXT, user_id="12345678-1234-1234-1234-123456789012")
await asyncio.gather(*list(processor._background_tasks))
mock_client.get_protection_scopes.assert_called_once()