//! Collects markdown stream source at newline boundaries. //! //! `MarkdownStreamCollector` buffers incoming token deltas and exposes a commit boundary at each //! newline. The stream controllers (`streaming/controller.rs`) call `commit_complete_source()` //! after each newline-bearing delta to obtain the completed prefix for re-rendering, leaving the //! trailing incomplete line in the buffer for the next delta. //! //! On finalization, `finalize_and_drain_source()` flushes whatever remains (the last line, which //! may lack a trailing newline). #[cfg(test)] use ratatui::text::Line; use std::path::Path; #[cfg(test)] use std::path::PathBuf; #[cfg(test)] use crate::markdown; /// Newline-gated accumulator that buffers raw markdown source and commits only completed lines. /// /// The buffer tracks how many source bytes have already been committed via /// `committed_source_len`, so each `commit_complete_source()` call returns only the newly /// completed portion. This design lets the stream controller re-render the entire accumulated /// source while only appending new content. /// /// The collector does not parse markdown in production. It only defines stable source boundaries; /// rendering lives in the stream controllers so width changes can re-render from one accumulated /// source string. pub(crate) struct MarkdownStreamCollector { buffer: String, committed_source_len: usize, #[cfg(test)] committed_line_count: usize, width: Option, #[cfg(test)] cwd: PathBuf, } impl MarkdownStreamCollector { /// Create a collector that accumulates raw markdown deltas. /// /// `width` and `cwd` are only used by test-only rendering helpers; production stream commits /// operate on raw source boundaries. The collector snapshots `cwd` so test rendering keeps /// local file-link display stable across incremental commits. pub fn new(width: Option, cwd: &Path) -> Self { #[cfg(not(test))] let _ = cwd; Self { buffer: String::new(), committed_source_len: 0, #[cfg(test)] committed_line_count: 0, width, #[cfg(test)] cwd: cwd.to_path_buf(), } } /// Update the rendering width used by test-only line-commit helpers. pub fn set_width(&mut self, width: Option) { self.width = width; } /// Reset all buffered source and commit bookkeeping. pub fn clear(&mut self) { self.buffer.clear(); self.committed_source_len = 0; #[cfg(test)] { self.committed_line_count = 0; } } /// Append a raw streaming delta to the internal source buffer. pub fn push_delta(&mut self, delta: &str) { tracing::trace!("push_delta: {delta:?}"); self.buffer.push_str(delta); } /// Commit newly completed raw markdown source up to the last newline. /// /// This returns only source that has not been returned by a previous commit. Calling it after a /// delta without a newline returns `None`, which prevents the live stream from rendering /// incomplete markdown blocks that may change meaning when the rest of the line arrives. pub fn commit_complete_source(&mut self) -> Option { let commit_end = self.buffer.rfind('\n').map(|idx| idx + 1)?; if commit_end <= self.committed_source_len { return None; } let out = self.buffer[self.committed_source_len..commit_end].to_string(); self.committed_source_len = commit_end; Some(out) } /// Finalize the stream and return any remaining raw source. /// /// Ensures the returned source chunk is newline-terminated when non-empty so callers can /// safely run markdown block parsing on the final chunk. This method clears the collector; /// callers should not invoke it until the stream is truly complete or interrupted output is /// being intentionally consolidated. pub fn finalize_and_drain_source(&mut self) -> String { if self.committed_source_len >= self.buffer.len() { self.clear(); return String::new(); } let mut out = self.buffer[self.committed_source_len..].to_string(); if !out.ends_with('\n') { out.push('\n'); } self.clear(); out } /// Render the full buffer and return only the newly completed logical lines /// since the last commit. When the buffer does not end with a newline, the /// final rendered line is considered incomplete and is not emitted. /// /// This helper intentionally uses `append_markdown` (not /// `append_markdown_agent`) so tests can isolate collector newline boundary /// behavior without stream-controller holdback semantics. #[cfg(test)] pub fn commit_complete_lines(&mut self) -> Vec> { let Some(commit_end) = self.buffer.rfind('\n').map(|idx| idx + 1) else { return Vec::new(); }; if commit_end <= self.committed_source_len { return Vec::new(); } let source = self.buffer[..commit_end].to_string(); let mut rendered: Vec> = Vec::new(); markdown::append_markdown(&source, self.width, Some(self.cwd.as_path()), &mut rendered); let mut complete_line_count = rendered.len(); if complete_line_count > 0 && crate::render::line_utils::is_blank_line_spaces_only( &rendered[complete_line_count - 1], ) { complete_line_count -= 1; } if self.committed_line_count >= complete_line_count { return Vec::new(); } let out_slice = &rendered[self.committed_line_count..complete_line_count]; let out = out_slice.to_vec(); self.committed_source_len = commit_end; self.committed_line_count = complete_line_count; out } /// Finalize the stream: emit all remaining lines beyond the last commit. /// If the buffer does not end with a newline, a temporary one is appended /// for rendering. #[cfg(test)] pub fn finalize_and_drain(&mut self) -> Vec> { let mut source = self.buffer.clone(); if source.is_empty() { self.clear(); return Vec::new(); } if !source.ends_with('\n') { source.push('\n'); }; tracing::debug!( raw_len = self.buffer.len(), source_len = source.len(), "markdown finalize (raw length: {}, rendered length: {})", self.buffer.len(), source.len() ); tracing::trace!("markdown finalize (raw source):\n---\n{source}\n---"); let mut rendered: Vec> = Vec::new(); markdown::append_markdown(&source, self.width, Some(self.cwd.as_path()), &mut rendered); let out = if self.committed_line_count >= rendered.len() { Vec::new() } else { rendered[self.committed_line_count..].to_vec() }; // Reset collector state for next stream. self.clear(); out } } #[cfg(test)] fn test_cwd() -> PathBuf { // These tests only need a stable absolute cwd; using temp_dir() avoids baking Unix- or // Windows-specific root semantics into the fixtures. std::env::temp_dir() } #[cfg(test)] pub(crate) fn simulate_stream_markdown_for_tests( deltas: &[&str], finalize: bool, ) -> Vec> { let mut collector = MarkdownStreamCollector::new(/*width*/ None, &test_cwd()); let mut out = Vec::new(); for d in deltas { collector.push_delta(d); if d.contains('\n') { out.extend(collector.commit_complete_lines()); } } if finalize { out.extend(collector.finalize_and_drain()); } out } #[cfg(test)] mod tests { use super::*; use ratatui::style::Color; #[tokio::test] async fn no_commit_until_newline() { let mut c = super::MarkdownStreamCollector::new(/*width*/ None, &super::test_cwd()); c.push_delta("Hello, world"); let out = c.commit_complete_lines(); assert!(out.is_empty(), "should not commit without newline"); c.push_delta("!\n"); let out2 = c.commit_complete_lines(); assert_eq!(out2.len(), 1, "one completed line after newline"); } #[tokio::test] async fn finalize_commits_partial_line() { let mut c = super::MarkdownStreamCollector::new(/*width*/ None, &super::test_cwd()); c.push_delta("Line without newline"); let out = c.finalize_and_drain(); assert_eq!(out.len(), 1); } #[tokio::test] async fn e2e_stream_blockquote_simple_is_green() { let out = super::simulate_stream_markdown_for_tests(&["> Hello\n"], /*finalize*/ true); assert_eq!(out.len(), 1); let l = &out[0]; assert_eq!( l.style.fg, Some(Color::Green), "expected blockquote line fg green, got {:?}", l.style.fg ); } #[tokio::test] async fn e2e_stream_blockquote_nested_is_green() { let out = super::simulate_stream_markdown_for_tests( &["> Level 1\n>> Level 2\n"], /*finalize*/ true, ); // Filter out any blank lines that may be inserted at paragraph starts. let non_blank: Vec<_> = out .into_iter() .filter(|l| { let s = l .spans .iter() .map(|sp| sp.content.clone()) .collect::>() .join(""); let t = s.trim(); // Ignore quote-only blank lines like ">" inserted at paragraph boundaries. !(t.is_empty() || t == ">") }) .collect(); assert_eq!(non_blank.len(), 2); assert_eq!(non_blank[0].style.fg, Some(Color::Green)); assert_eq!(non_blank[1].style.fg, Some(Color::Green)); } #[tokio::test] async fn e2e_stream_blockquote_with_list_items_is_green() { let out = super::simulate_stream_markdown_for_tests( &["> - item 1\n> - item 2\n"], /*finalize*/ true, ); assert_eq!(out.len(), 2); assert_eq!(out[0].style.fg, Some(Color::Green)); assert_eq!(out[1].style.fg, Some(Color::Green)); } #[tokio::test] async fn e2e_stream_nested_mixed_lists_ordered_marker_is_light_blue() { let md = [ "1. First\n", " - Second level\n", " 1. Third level (ordered)\n", " - Fourth level (bullet)\n", " - Fifth level to test indent consistency\n", ]; let out = super::simulate_stream_markdown_for_tests(&md, /*finalize*/ true); // Find the line that contains the third-level ordered text let find_idx = out.iter().position(|l| { l.spans .iter() .map(|s| s.content.clone()) .collect::() .contains("Third level (ordered)") }); let idx = find_idx.expect("expected third-level ordered line"); let line = &out[idx]; // Expect at least one span on this line to be styled light blue let has_light_blue = line .spans .iter() .any(|s| s.style.fg == Some(ratatui::style::Color::LightBlue)); assert!( has_light_blue, "expected an ordered-list marker span with light blue fg on: {line:?}" ); } #[tokio::test] async fn e2e_stream_blockquote_wrap_preserves_green_style() { let long = "> This is a very long quoted line that should wrap across multiple columns to verify style preservation."; let out = super::simulate_stream_markdown_for_tests(&[long, "\n"], /*finalize*/ true); // Wrap to a narrow width to force multiple output lines. let wrapped = crate::wrapping::word_wrap_lines( out.iter(), crate::wrapping::RtOptions::new(/*width*/ 24), ); // Filter out purely blank lines let non_blank: Vec<_> = wrapped .into_iter() .filter(|l| { let s = l .spans .iter() .map(|sp| sp.content.clone()) .collect::>() .join(""); !s.trim().is_empty() }) .collect(); assert!( non_blank.len() >= 2, "expected wrapped blockquote to span multiple lines" ); for (i, l) in non_blank.iter().enumerate() { assert_eq!( l.spans[0].style.fg, Some(Color::Green), "wrapped line {} should preserve green style, got {:?}", i, l.spans[0].style.fg ); } } #[tokio::test] async fn heading_starts_on_new_line_when_following_paragraph() { // Stream a paragraph line, then a heading on the next line. // Expect two distinct rendered lines: "Hello." and "Heading". let mut c = super::MarkdownStreamCollector::new(/*width*/ None, &super::test_cwd()); c.push_delta("Hello.\n"); let out1 = c.commit_complete_lines(); let s1: Vec = out1 .iter() .map(|l| { l.spans .iter() .map(|s| s.content.clone()) .collect::>() .join("") }) .collect(); assert_eq!( out1.len(), 1, "first commit should contain only the paragraph line, got {}: {:?}", out1.len(), s1 ); c.push_delta("## Heading\n"); let out2 = c.commit_complete_lines(); let s2: Vec = out2 .iter() .map(|l| { l.spans .iter() .map(|s| s.content.clone()) .collect::>() .join("") }) .collect(); assert_eq!( s2, vec!["", "## Heading"], "expected a blank separator then the heading line" ); let line_to_string = |l: &ratatui::text::Line<'_>| -> String { l.spans .iter() .map(|s| s.content.clone()) .collect::>() .join("") }; assert_eq!(line_to_string(&out1[0]), "Hello."); assert_eq!(line_to_string(&out2[1]), "## Heading"); } #[tokio::test] async fn heading_not_inlined_when_split_across_chunks() { // Paragraph without trailing newline, then a chunk that starts with the newline // and the heading text, then a final newline. The collector should first commit // only the paragraph line, and later commit the heading as its own line. let mut c = super::MarkdownStreamCollector::new(/*width*/ None, &super::test_cwd()); c.push_delta("Sounds good!"); // No commit yet assert!(c.commit_complete_lines().is_empty()); // Introduce the newline that completes the paragraph and the start of the heading. c.push_delta("\n## Adding Bird subcommand"); let out1 = c.commit_complete_lines(); let s1: Vec = out1 .iter() .map(|l| { l.spans .iter() .map(|s| s.content.clone()) .collect::>() .join("") }) .collect(); assert_eq!( s1, vec!["Sounds good!"], "expected paragraph followed by blank separator before heading chunk" ); // Now finish the heading line with the trailing newline. c.push_delta("\n"); let out2 = c.commit_complete_lines(); let s2: Vec = out2 .iter() .map(|l| { l.spans .iter() .map(|s| s.content.clone()) .collect::>() .join("") }) .collect(); assert_eq!( s2, vec!["", "## Adding Bird subcommand"], "expected the heading line only on the final commit" ); // Sanity check raw markdown rendering for a simple line does not produce spurious extras. let mut rendered: Vec> = Vec::new(); let test_cwd = super::test_cwd(); crate::markdown::append_markdown( "Hello.\n", /*width*/ None, Some(test_cwd.as_path()), &mut rendered, ); let rendered_strings: Vec = rendered .iter() .map(|l| { l.spans .iter() .map(|s| s.content.clone()) .collect::>() .join("") }) .collect(); assert_eq!( rendered_strings, vec!["Hello."], "unexpected markdown lines: {rendered_strings:?}" ); } fn lines_to_plain_strings(lines: &[ratatui::text::Line<'_>]) -> Vec { lines .iter() .map(|l| { l.spans .iter() .map(|s| s.content.clone()) .collect::>() .join("") }) .collect() } #[tokio::test] async fn table_header_commits_without_holdback() { let mut c = super::MarkdownStreamCollector::new(/*width*/ None, &super::test_cwd()); c.push_delta("| A | B |\n"); let out1 = c.commit_complete_lines(); let out1_str = lines_to_plain_strings(&out1); assert_eq!(out1_str, vec!["| A | B |".to_string()]); c.push_delta("| --- | --- |\n"); let out = c.commit_complete_lines(); let out_str = lines_to_plain_strings(&out); assert!( !out_str.is_empty(), "expected output to continue committing after delimiter: {out_str:?}" ); c.push_delta("| 1 | 2 |\n"); let out2 = c.commit_complete_lines(); assert!( !out2.is_empty(), "expected output to continue committing after body row" ); c.push_delta("\n"); let _ = c.commit_complete_lines(); } #[tokio::test] async fn pipe_text_without_table_prefix_is_not_delayed() { let mut c = super::MarkdownStreamCollector::new(/*width*/ None, &super::test_cwd()); c.push_delta("Escaped pipe in text: a | b | c\n"); let out = c.commit_complete_lines(); let out_str = lines_to_plain_strings(&out); assert_eq!(out_str, vec!["Escaped pipe in text: a | b | c".to_string()]); } #[tokio::test] async fn lists_and_fences_commit_without_duplication() { // List case assert_streamed_equals_full(&["- a\n- ", "b\n- c\n"]).await; // Fenced code case: stream in small chunks assert_streamed_equals_full(&["```", "\nco", "de 1\ncode 2\n", "```\n"]).await; } #[tokio::test] async fn utf8_boundary_safety_and_wide_chars() { // Emoji (wide), CJK, control char, digit + combining macron sequences let input = "🙂🙂🙂\n汉字挢字\nA\u{0003}0\u{0304}\n"; let deltas = vec![ "🙂", "🙂", "🙂\n汉", "字挢", "字\nA", "\u{0003}", "0", "\u{0304}", "\n", ]; let streamed = simulate_stream_markdown_for_tests(&deltas, /*finalize*/ true); let streamed_str = lines_to_plain_strings(&streamed); let mut rendered_all: Vec> = Vec::new(); let test_cwd = super::test_cwd(); crate::markdown::append_markdown( input, /*width*/ None, Some(test_cwd.as_path()), &mut rendered_all, ); let rendered_all_str = lines_to_plain_strings(&rendered_all); assert_eq!( streamed_str, rendered_all_str, "utf8/wide-char streaming should equal full render without duplication or truncation" ); } #[tokio::test] async fn e2e_stream_deep_nested_third_level_marker_is_light_blue() { let md = "1. First\n - Second level\n 1. Third level (ordered)\n - Fourth level (bullet)\n - Fifth level to test indent consistency\n"; let streamed = super::simulate_stream_markdown_for_tests(&[md], /*finalize*/ true); let streamed_strs = lines_to_plain_strings(&streamed); // Locate the third-level line in the streamed output; avoid relying on exact indent. let target_suffix = "1. Third level (ordered)"; let mut found = None; for line in &streamed { let s: String = line.spans.iter().map(|sp| sp.content.clone()).collect(); if s.contains(target_suffix) { found = Some(line.clone()); break; } } let line = found.unwrap_or_else(|| { panic!("expected to find the third-level ordered list line; got: {streamed_strs:?}") }); // The marker (including indent and "1.") is expected to be in the first span // and colored LightBlue; following content should be default color. assert!( !line.spans.is_empty(), "expected non-empty spans for the third-level line" ); let marker_span = &line.spans[0]; assert_eq!( marker_span.style.fg, Some(Color::LightBlue), "expected LightBlue 3rd-level ordered marker, got {:?}", marker_span.style.fg ); // Find the first non-empty non-space content span and verify it is default color. let mut content_fg = None; for sp in &line.spans[1..] { let t = sp.content.trim(); if !t.is_empty() { content_fg = Some(sp.style.fg); break; } } assert_eq!( content_fg.flatten(), None, "expected default color for 3rd-level content, got {content_fg:?}" ); } #[tokio::test] async fn empty_fenced_block_is_dropped_and_separator_preserved_before_heading() { // An empty fenced code block followed by a heading should not render the fence, // but should preserve a blank separator line so the heading starts on a new line. let deltas = vec!["```bash\n```\n", "## Heading\n"]; // empty block and close in same commit let streamed = simulate_stream_markdown_for_tests(&deltas, /*finalize*/ true); let texts = lines_to_plain_strings(&streamed); assert!( texts.iter().all(|s| !s.contains("```")), "no fence markers expected: {texts:?}" ); // Expect the heading and no fence markers. A blank separator may or may not be rendered at start. assert!( texts.iter().any(|s| s == "## Heading"), "expected heading line: {texts:?}" ); } #[tokio::test] async fn paragraph_then_empty_fence_then_heading_keeps_heading_on_new_line() { let deltas = vec!["Para.\n", "```\n```\n", "## Title\n"]; // empty fence block in one commit let streamed = simulate_stream_markdown_for_tests(&deltas, /*finalize*/ true); let texts = lines_to_plain_strings(&streamed); let para_idx = match texts.iter().position(|s| s == "Para.") { Some(i) => i, None => panic!("para present"), }; let head_idx = match texts.iter().position(|s| s == "## Title") { Some(i) => i, None => panic!("heading present"), }; assert!( head_idx > para_idx, "heading should not merge with paragraph: {texts:?}" ); } #[tokio::test] async fn loose_list_with_split_dashes_matches_full_render() { // Minimized failing sequence discovered by the helper: two chunks // that still reproduce the mismatch. let deltas = vec!["- item.\n\n", "-"]; let streamed = simulate_stream_markdown_for_tests(&deltas, /*finalize*/ true); let streamed_strs = lines_to_plain_strings(&streamed); let full: String = deltas.iter().copied().collect(); let mut rendered_all: Vec> = Vec::new(); let test_cwd = super::test_cwd(); crate::markdown::append_markdown( &full, /*width*/ None, Some(test_cwd.as_path()), &mut rendered_all, ); let rendered_all_strs = lines_to_plain_strings(&rendered_all); assert_eq!( streamed_strs, rendered_all_strs, "streamed output should match full render without dangling '-' lines" ); } #[tokio::test] async fn loose_vs_tight_list_items_streaming_matches_full() { // Deltas extracted from the session log around 2025-08-27T00:33:18.216Z let deltas = vec![ "\n\n", "Loose", " vs", ".", " tight", " list", " items", ":\n", "1", ".", " Tight", " item", "\n", "2", ".", " Another", " tight", " item", "\n\n", "1", ".", " Loose", " item", " with", " its", " own", " paragraph", ".\n\n", " ", " This", " paragraph", " belongs", " to", " the", " same", " list", " item", ".\n\n", "2", ".", " Second", " loose", " item", " with", " a", " nested", " list", " after", " a", " blank", " line", ".\n\n", " ", " -", " Nested", " bullet", " under", " a", " loose", " item", "\n", " ", " -", " Another", " nested", " bullet", "\n\n", ]; let streamed = simulate_stream_markdown_for_tests(&deltas, /*finalize*/ true); let streamed_strs = lines_to_plain_strings(&streamed); // Compute a full render for diagnostics only. let full: String = deltas.iter().copied().collect(); let mut rendered_all: Vec> = Vec::new(); let test_cwd = super::test_cwd(); crate::markdown::append_markdown( &full, /*width*/ None, Some(test_cwd.as_path()), &mut rendered_all, ); // Also assert exact expected plain strings for clarity. let expected = vec![ "Loose vs. tight list items:".to_string(), "".to_string(), "1. Tight item".to_string(), "2. Another tight item".to_string(), "3. Loose item with its own paragraph.".to_string(), "".to_string(), " This paragraph belongs to the same list item.".to_string(), "".to_string(), "4. Second loose item with a nested list after a blank line.".to_string(), " - Nested bullet under a loose item".to_string(), " - Another nested bullet".to_string(), ]; assert_eq!( streamed_strs, expected, "expected exact rendered lines for loose/tight section" ); } // Targeted tests derived from fuzz findings. Each asserts streamed == full render. async fn assert_streamed_equals_full(deltas: &[&str]) { let streamed = simulate_stream_markdown_for_tests(deltas, /*finalize*/ true); let streamed_strs = lines_to_plain_strings(&streamed); let full: String = deltas.iter().copied().collect(); let mut rendered: Vec> = Vec::new(); let test_cwd = super::test_cwd(); crate::markdown::append_markdown( &full, /*width*/ None, Some(test_cwd.as_path()), &mut rendered, ); let rendered_strs = lines_to_plain_strings(&rendered); assert_eq!(streamed_strs, rendered_strs, "full:\n---\n{full}\n---"); } #[tokio::test] async fn fuzz_class_bullet_duplication_variant_1() { assert_streamed_equals_full(&[ "aph.\n- let one\n- bull", "et two\n\n second paragraph \n", ]) .await; } #[tokio::test] async fn fuzz_class_bullet_duplication_variant_2() { assert_streamed_equals_full(&[ "- e\n c", "e\n- bullet two\n\n second paragraph in bullet two\n", ]) .await; } #[tokio::test] async fn streaming_html_block_then_text_matches_full() { assert_streamed_equals_full(&[ "HTML block:\n", "
inline block
\n", "more stuff\n", ]) .await; } #[tokio::test] async fn table_like_lines_inside_fenced_code_are_not_held() { assert_streamed_equals_full(&["```\n", "| a | b |\n", "```\n"]).await; } #[tokio::test] async fn collector_source_chunks_round_trip_into_agent_fence_unwrapping() { let deltas = [ "```md\n", "| A | B |\n", "|---|---|\n", "| 1 | 2 |\n", "```\n", ]; let mut collector = super::MarkdownStreamCollector::new(/*width*/ None, &super::test_cwd()); let mut raw_source = String::new(); for delta in deltas { collector.push_delta(delta); if delta.contains('\n') && let Some(chunk) = collector.commit_complete_source() { raw_source.push_str(&chunk); } } raw_source.push_str(&collector.finalize_and_drain_source()); let mut rendered = Vec::new(); crate::markdown::append_markdown_agent(&raw_source, /*width*/ None, &mut rendered); let rendered_strs = lines_to_plain_strings(&rendered); assert!( rendered_strs.iter().any(|line| line.contains('━')), "expected markdown-fenced table to render with a separator: {rendered_strs:?}" ); assert!( !rendered_strs.iter().any(|line| line.trim() == "| A | B |"), "did not expect raw table header after markdown-fence unwrapping: {rendered_strs:?}" ); } }