From 384cbad20630a6b73e66ce17145a0914327373ee Mon Sep 17 00:00:00 2001 From: Justin Moon Date: Mon, 2 Mar 2026 20:56:21 -0600 Subject: [PATCH 01/11] Define negotiate policy contract and tracking state --- crates/rally-workflow-plan/src/lib.rs | 445 ++++++++++++++++++++++++-- 1 file changed, 412 insertions(+), 33 deletions(-) diff --git a/crates/rally-workflow-plan/src/lib.rs b/crates/rally-workflow-plan/src/lib.rs index 101b98e..fe11ba6 100644 --- a/crates/rally-workflow-plan/src/lib.rs +++ b/crates/rally-workflow-plan/src/lib.rs @@ -48,6 +48,153 @@ pub struct FinalizationState { pub reopened_for_issues: bool, } +#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct RegistrationPolicyContract { + #[serde(default = "default_registration_min_agents")] + pub min_agents: u32, + #[serde(default)] + pub enforce_unique_identity: bool, + #[serde(default)] + pub fail_closed_missing_peers: bool, + #[serde(default)] + pub allow_single_agent_fallback: bool, +} + +impl Default for RegistrationPolicyContract { + fn default() -> Self { + Self { + min_agents: default_registration_min_agents(), + enforce_unique_identity: false, + fail_closed_missing_peers: false, + allow_single_agent_fallback: true, + } + } +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct DraftIssuePolicyContract { + #[serde(default)] + pub enable_private_drafts: bool, + #[serde(default)] + pub merge_before_negotiation: bool, +} + +impl Default for DraftIssuePolicyContract { + fn default() -> Self { + Self { + enable_private_drafts: false, + merge_before_negotiation: false, + } + } +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct AgreementPolicyContract { + #[serde(default)] + pub require_challenge_before_agree: bool, + #[serde(default)] + pub disallow_self_agreement: bool, + #[serde(default)] + pub min_cross_agent_challenges: u32, +} + +impl Default for AgreementPolicyContract { + fn default() -> Self { + Self { + require_challenge_before_agree: false, + disallow_self_agreement: false, + min_cross_agent_challenges: 0, + } + } +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct FinalizationPolicyContract { + #[serde(default)] + pub require_coverage_audit: bool, + #[serde(default)] + pub require_final_doc: bool, + #[serde(default)] + pub require_writer_reviewer_separation: bool, + #[serde(default)] + pub allow_single_agent_fallback: bool, + #[serde(default)] + pub min_agreed_issues: u32, +} + +impl Default for FinalizationPolicyContract { + fn default() -> Self { + Self { + require_coverage_audit: false, + require_final_doc: false, + require_writer_reviewer_separation: false, + allow_single_agent_fallback: true, + min_agreed_issues: 0, + } + } +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq, Default)] +pub struct PlanPolicyContract { + #[serde(default)] + pub registration: RegistrationPolicyContract, + #[serde(default)] + pub draft_issue_phase: DraftIssuePolicyContract, + #[serde(default)] + pub agreement: AgreementPolicyContract, + #[serde(default)] + pub finalization: FinalizationPolicyContract, +} + +impl PlanPolicyContract { + pub fn strict_negotiate() -> Self { + Self { + registration: RegistrationPolicyContract { + min_agents: 3, + enforce_unique_identity: true, + fail_closed_missing_peers: true, + allow_single_agent_fallback: false, + }, + draft_issue_phase: DraftIssuePolicyContract { + enable_private_drafts: true, + merge_before_negotiation: true, + }, + agreement: AgreementPolicyContract { + require_challenge_before_agree: true, + disallow_self_agreement: true, + min_cross_agent_challenges: 2, + }, + finalization: FinalizationPolicyContract { + require_coverage_audit: true, + require_final_doc: true, + require_writer_reviewer_separation: true, + allow_single_agent_fallback: false, + min_agreed_issues: 1, + }, + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct DraftIssueState { + pub draft_id: u32, + pub title: String, + pub slug: String, + pub author: String, + pub file: String, + pub created_at: DateTime, +} + +#[derive(Clone, Debug, Serialize, Deserialize, Default)] +pub struct NegotiationTrackingState { + #[serde(default)] + pub analysis_completed_agents: BTreeSet, + #[serde(default)] + pub draft_issues: Vec, + #[serde(default)] + pub issues_with_challenges: BTreeSet, +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct PlanWorkflowState { #[serde(default = "default_state_version")] @@ -58,6 +205,8 @@ pub struct PlanWorkflowState { pub issues: Vec, #[serde(default)] pub finalization: FinalizationState, + #[serde(default)] + pub tracking: NegotiationTrackingState, } impl Default for PlanWorkflowState { @@ -67,6 +216,7 @@ impl Default for PlanWorkflowState { turn: None, issues: Vec::new(), finalization: FinalizationState::default(), + tracking: NegotiationTrackingState::default(), } } } @@ -85,6 +235,10 @@ pub struct PlanDoneDispatch { } fn default_state_version() -> u32 { + 2 +} + +fn default_registration_min_agents() -> u32 { 1 } @@ -95,6 +249,15 @@ impl PlanWorkflowState { { bail!("workflow_state turn holder cannot be empty"); } + let mut seen_drafts = BTreeSet::new(); + for draft in &self.tracking.draft_issues { + if draft.author.trim().is_empty() { + bail!("workflow_state draft issue author cannot be empty"); + } + if !seen_drafts.insert(draft.draft_id) { + bail!("workflow_state draft issue ids must be unique"); + } + } Ok(()) } } @@ -331,6 +494,7 @@ pub fn file_issue( let question = question.unwrap_or(""); let context = context.unwrap_or(""); + let created_at = now(); let template = format!( "# Issue {:02}: {}\n\nTitle: {}\nQuestion: {}\nContext: {}\n\n## Position A\n\n## Position B\n", next_id, title, title, question, context @@ -341,15 +505,25 @@ pub fn file_issue( workflow_state.issues.push(IssueState { id: next_id, slug, - file: file_name, + file: file_name.clone(), title: title.to_string(), author: agent.to_string(), status: IssueStatus::Open, positions: Default::default(), challenges: Default::default(), agreed_by: None, - created_at: now(), + created_at, }); + if state.phase == SessionPhase::Analysis { + workflow_state.tracking.draft_issues.push(DraftIssueState { + draft_id: next_id, + title: title.to_string(), + slug: slugify(title), + author: agent.to_string(), + file: file_name.clone(), + created_at, + }); + } if state.phase == SessionPhase::FinalizationReview { workflow_state.finalization.reopened_for_issues = true; @@ -374,37 +548,41 @@ pub fn challenge_issue( ensure_turn_holder(state, &workflow_state, agent)?; refresh_negotiation_state_in_place(state, &mut workflow_state, session_dir)?; - let issue = workflow_state - .issues - .iter_mut() - .find(|i| i.id == issue_id) - .ok_or_else(|| anyhow!("issue {:02} not found", issue_id))?; + let (current_issue_id, position) = { + let issue = workflow_state + .issues + .iter_mut() + .find(|i| i.id == issue_id) + .ok_or_else(|| anyhow!("issue {:02} not found", issue_id))?; - if issue.status != IssueStatus::Open { - bail!("issue {:02} is not open", issue_id); - } + if issue.status != IssueStatus::Open { + bail!("issue {:02} is not open", issue_id); + } - let current_issue_id = issue.id; - let position = position_path(session_dir, issue, agent); - if let Some(parent) = position.parent() { - fs::create_dir_all(parent)?; - } + let current_issue_id = issue.id; + let position = position_path(session_dir, issue, agent); + if let Some(parent) = position.parent() { + fs::create_dir_all(parent)?; + } - if !position.exists() { - fs::write( - &position, - format!("# Position for issue {:02}\n\n", current_issue_id), - )?; - } + if !position.exists() { + fs::write( + &position, + format!("# Position for issue {:02}\n\n", current_issue_id), + )?; + } - let marker = format!( - "\n## Challenge {}\n- Add your challenge details here.\n", - now().to_rfc3339() - ); - append_text(&position, &marker)?; + let marker = format!( + "\n## Challenge {}\n- Add your challenge details here.\n", + now().to_rfc3339() + ); + append_text(&position, &marker)?; - issue.positions.insert(agent.to_string()); - issue.challenges.insert(agent.to_string()); + issue.positions.insert(agent.to_string()); + issue.challenges.insert(agent.to_string()); + (current_issue_id, position) + }; + sync_issue_challenge_tracking(&mut workflow_state, current_issue_id); let position_display = position.display().to_string(); write_workflow_state(state, &workflow_state)?; @@ -507,6 +685,10 @@ pub fn mark_done(state: &mut SessionState, agent: &str, session_dir: &Path) -> R if let Some(agent_state) = state.agents.get_mut(agent) { agent_state.phase_status = "analysis_done".to_string(); } + workflow_state + .tracking + .analysis_completed_agents + .insert(agent.to_string()); if state .agents .values() @@ -697,6 +879,9 @@ fn refresh_negotiation_state_in_place( } } } + if recompute_challenge_tracking(workflow_state) { + changed = true; + } if maybe_resolve_negotiation(state, workflow_state) { changed = true; @@ -855,6 +1040,7 @@ fn renumber_issues_interleaved( } workflow_state.issues = interleaved; + let _ = recompute_challenge_tracking(workflow_state); Ok(()) } @@ -888,6 +1074,40 @@ fn issue_dir_path(session_dir: &Path, issue_id: u32, slug: &str) -> PathBuf { .join(format!("{:02}-{slug}", issue_id)) } +fn sync_issue_challenge_tracking(workflow_state: &mut PlanWorkflowState, issue_id: u32) { + if let Some(issue) = workflow_state + .issues + .iter() + .find(|issue| issue.id == issue_id) + { + if issue.challenges.is_empty() { + workflow_state + .tracking + .issues_with_challenges + .remove(&issue_id); + } else { + workflow_state + .tracking + .issues_with_challenges + .insert(issue_id); + } + } +} + +fn recompute_challenge_tracking(workflow_state: &mut PlanWorkflowState) -> bool { + let mut next = BTreeSet::new(); + for issue in &workflow_state.issues { + if !issue.challenges.is_empty() { + next.insert(issue.id); + } + } + if next == workflow_state.tracking.issues_with_challenges { + return false; + } + workflow_state.tracking.issues_with_challenges = next; + true +} + fn append_text(path: &Path, text: &str) -> Result<()> { let existing = fs::read_to_string(path).unwrap_or_default(); fs::write(path, format!("{existing}{text}"))?; @@ -951,6 +1171,10 @@ fn slugify(input: &str) -> String { pub trait PlanPolicy { fn id(&self) -> &'static str; + fn contract(&self) -> PlanPolicyContract { + PlanPolicyContract::default() + } + fn allow_registration(&self, _state: &SessionState) -> Result<()> { Ok(()) } @@ -985,8 +1209,13 @@ impl PlanPolicy for StrictNegotiationPolicy { "strict-negotiate" } + fn contract(&self) -> PlanPolicyContract { + PlanPolicyContract::strict_negotiate() + } + fn allow_registration(&self, state: &SessionState) -> Result<()> { - if state.config.expected_agents < 3 { + let contract = self.contract(); + if state.config.expected_agents < contract.registration.min_agents { bail!( "strict negotiate policy requires at least 3 agents (got {})", state.config.expected_agents @@ -1012,34 +1241,55 @@ impl PlanPolicy for StrictNegotiationPolicy { } fn allow_agree(&self, state: &SessionState, issue_id: u32, agent: &str) -> Result<()> { + let contract = self.contract(); let workflow_state = read_workflow_state(state)?; let issue = workflow_state .issues .iter() .find(|issue| issue.id == issue_id) .ok_or_else(|| anyhow!("issue {:02} not found", issue_id))?; + if !contract.agreement.require_challenge_before_agree { + return Ok(()); + } let cross_agent_challenges = issue .challenges .iter() .filter(|name| *name != agent) .count(); - if cross_agent_challenges < 2 { + let required = contract.agreement.min_cross_agent_challenges as usize; + if cross_agent_challenges < required { + if required == 2 { + bail!( + "strict negotiate policy requires at least two cross-agent challenges before agree" + ); + } bail!( - "strict negotiate policy requires at least two cross-agent challenges before agree" + "strict negotiate policy requires at least {} cross-agent challenges before agree", + required ); } Ok(()) } fn allow_finalize(&self, state: &SessionState, _agent: &str) -> Result<()> { + let contract = self.contract(); let workflow_state = read_workflow_state(state)?; let agreed = workflow_state .issues .iter() .filter(|issue| issue.status == IssueStatus::Agreed) .count(); - if agreed == 0 { - bail!("strict negotiate policy requires at least one AGREED issue before finalization"); + let required = contract.finalization.min_agreed_issues as usize; + if agreed < required { + if required == 1 { + bail!( + "strict negotiate policy requires at least one AGREED issue before finalization" + ); + } + bail!( + "strict negotiate policy requires at least {} AGREED issues before finalization", + required + ); } Ok(()) } @@ -1285,3 +1535,132 @@ fn normalize_issue_key(input: &str) -> String { trimmed } } + +#[cfg(test)] +mod tests { + use super::*; + use rally_core::{AgentState, Config, SessionType}; + use serde_json::json; + + fn test_state(phase: SessionPhase) -> SessionState { + let now_ts = now(); + let mut agents = BTreeMap::new(); + agents.insert( + "agent-a".to_string(), + AgentState { + name: "agent-a".to_string(), + joined_at: now_ts, + last_seen: now_ts, + phase_status: "analysis_pending".to_string(), + rounds: 0, + }, + ); + SessionState { + name: "test-session".to_string(), + session_type: SessionType::Workflow, + phase, + created_at: now_ts, + config: Config { + expected_agents: 1, + max_rounds: 3, + turn_timeout_secs: 300, + review_timeout_secs: 300, + }, + agents, + topic: None, + todo_path: None, + workspace: None, + workflow_id: Some("demo/negotiate".to_string()), + workflow_source: Some("buildtime".to_string()), + workflow_version: None, + workflow_state: Some(initial_workflow_state().expect("initial workflow state")), + } + } + + fn temp_session_dir(prefix: &str) -> PathBuf { + let path = std::env::temp_dir().join(format!( + "{}-{}", + prefix, + now().timestamp_nanos_opt().unwrap_or_default() + )); + fs::create_dir_all(path.join("analysis")).expect("create analysis dir"); + fs::create_dir_all(path.join("issues")).expect("create issues dir"); + path + } + + #[test] + fn legacy_state_deserializes_with_new_tracking_defaults() -> Result<()> { + let legacy = json!({ + "state_version": 1, + "turn": null, + "issues": [], + "finalization": { + "writer": null, + "reviewer": null, + "todo_ready": false, + "reopened_for_issues": false + } + }); + let parsed = parse_workflow_state(Some(legacy))?; + assert!(parsed.tracking.analysis_completed_agents.is_empty()); + assert!(parsed.tracking.draft_issues.is_empty()); + assert!(parsed.tracking.issues_with_challenges.is_empty()); + Ok(()) + } + + #[test] + fn strict_policy_contract_covers_negotiate_constraints() { + let policy = StrictNegotiationPolicy; + let contract = policy.contract(); + assert_eq!(contract.registration.min_agents, 3); + assert!(contract.registration.enforce_unique_identity); + assert!(contract.registration.fail_closed_missing_peers); + assert!(contract.draft_issue_phase.enable_private_drafts); + assert!(contract.draft_issue_phase.merge_before_negotiation); + assert!(contract.agreement.require_challenge_before_agree); + assert!(contract.agreement.disallow_self_agreement); + assert_eq!(contract.agreement.min_cross_agent_challenges, 2); + assert!(contract.finalization.require_writer_reviewer_separation); + assert!(contract.finalization.require_coverage_audit); + assert!(contract.finalization.require_final_doc); + assert_eq!(contract.finalization.min_agreed_issues, 1); + } + + #[test] + fn tracking_fields_capture_analysis_drafts_and_challenge_presence() -> Result<()> { + let dir = temp_session_dir("rally-plan-tracking-test"); + let mut state = test_state(SessionPhase::Analysis); + + fs::write( + dir.join("analysis").join("agent-a.md"), + "# Analysis\n\nDrafted independently.\n", + )?; + let _ = file_issue( + &mut state, + &dir, + "agent-a", + "Draft tracking issue", + None, + None, + )?; + let workflow_state = read_workflow_state(&state)?; + assert_eq!(workflow_state.tracking.draft_issues.len(), 1); + assert!(workflow_state.tracking.analysis_completed_agents.is_empty()); + + let _ = mark_done(&mut state, "agent-a", &dir)?; + let workflow_state = read_workflow_state(&state)?; + assert!( + workflow_state + .tracking + .analysis_completed_agents + .contains("agent-a") + ); + + let _ = challenge_issue(&mut state, &dir, "agent-a", 1)?; + let workflow_state = read_workflow_state(&state)?; + assert!(workflow_state.tracking.issues_with_challenges.contains(&1)); + + let _ = fs::remove_dir_all(&dir); + Ok(()) + } +} From 8da6681cecfc31d7c51c006ae8a22f85829f49a6 Mon Sep 17 00:00:00 2001 From: Justin Moon Date: Mon, 2 Mar 2026 21:01:32 -0600 Subject: [PATCH 02/11] Enforce strict negotiate registration constraints --- crates/rally-workflow-plan/src/lib.rs | 207 +++++++++++++++++++++++++- src/workflow/builtin.rs | 4 +- 2 files changed, 207 insertions(+), 4 deletions(-) diff --git a/crates/rally-workflow-plan/src/lib.rs b/crates/rally-workflow-plan/src/lib.rs index fe11ba6..5abf4c3 100644 --- a/crates/rally-workflow-plan/src/lib.rs +++ b/crates/rally-workflow-plan/src/lib.rs @@ -185,8 +185,18 @@ pub struct DraftIssueState { pub created_at: DateTime, } +#[derive(Clone, Debug, Serialize, Deserialize, Default)] +pub struct RegistrationTrackingState { + #[serde(default)] + pub canonical_agent_bindings: BTreeMap, + #[serde(default)] + pub fallback_enabled: bool, +} + #[derive(Clone, Debug, Serialize, Deserialize, Default)] pub struct NegotiationTrackingState { + #[serde(default)] + pub registration: RegistrationTrackingState, #[serde(default)] pub analysis_completed_agents: BTreeSet, #[serde(default)] @@ -258,6 +268,11 @@ impl PlanWorkflowState { bail!("workflow_state draft issue ids must be unique"); } } + for (canonical, agent) in &self.tracking.registration.canonical_agent_bindings { + if canonical.trim().is_empty() || agent.trim().is_empty() { + bail!("workflow_state registration bindings cannot contain empty values"); + } + } Ok(()) } } @@ -1304,8 +1319,10 @@ impl PlanEngine

{ Self { policy } } - pub fn join(&self, state: &mut SessionState) -> Result<()> { + pub fn join(&self, state: &mut SessionState, agent: &str) -> Result<()> { self.policy.allow_registration(state)?; + self.enforce_registration_identity(state, agent)?; + self.enforce_registration_window(state)?; if state.phase == SessionPhase::Registration && state.agents.len() as u32 >= state.config.expected_agents { @@ -1320,6 +1337,7 @@ impl PlanEngine

{ session_dir: &Path, agent: &str, ) -> Result { + self.enforce_registration_window(state)?; refresh_negotiation_state(state, session_dir)?; enforce_turn_timeout(state)?; let _ = prepare_instruction(state, agent)?; @@ -1440,6 +1458,71 @@ impl PlanEngine

{ Ok(lines) } + + fn enforce_registration_identity(&self, state: &mut SessionState, agent: &str) -> Result<()> { + if state.phase != SessionPhase::Registration { + return Ok(()); + } + let contract = self.policy.contract(); + if !contract.registration.enforce_unique_identity { + return Ok(()); + } + + let mut workflow_state = read_workflow_state(state)?; + let changed = register_participant_identity(&mut workflow_state, agent)?; + if changed { + write_workflow_state(state, &workflow_state)?; + } + Ok(()) + } + + fn enforce_registration_window(&self, state: &mut SessionState) -> Result<()> { + if state.phase != SessionPhase::Registration { + return Ok(()); + } + + let contract = self.policy.contract(); + if !contract.registration.fail_closed_missing_peers { + return Ok(()); + } + + let joined = state.agents.len() as u32; + let expected = state.config.expected_agents; + if joined >= expected { + return Ok(()); + } + + let deadline = registration_deadline(state); + if now() < deadline { + return Ok(()); + } + + let mut workflow_state = read_workflow_state(state)?; + if !workflow_state.tracking.registration.fallback_enabled + && registration_fallback_requested() + { + workflow_state.tracking.registration.fallback_enabled = true; + write_workflow_state(state, &workflow_state)?; + } + if contract.registration.allow_single_agent_fallback + || workflow_state.tracking.registration.fallback_enabled + { + if joined > 0 { + state.phase = SessionPhase::Proposal; + } + return Ok(()); + } + + let missing = expected.saturating_sub(joined); + bail!( + "registration failed closed at {}: only {}/{} agents joined (missing {}). \ +recover by recreating with a longer --review-timeout-secs, or explicitly opt in fallback with RALLY_REGISTRATION_FALLBACK=1 and rerun join/next", + deadline.to_rfc3339(), + joined, + expected, + missing + ); + } } pub fn proposal_instruction( @@ -1536,6 +1619,67 @@ fn normalize_issue_key(input: &str) -> String { } } +fn register_participant_identity( + workflow_state: &mut PlanWorkflowState, + agent: &str, +) -> Result { + let canonical = normalize_agent_identity(agent); + if canonical.is_empty() { + bail!( + "registration rejected agent '{}': canonical identity was empty; choose a non-empty --as value", + agent + ); + } + + if let Some(existing) = workflow_state + .tracking + .registration + .canonical_agent_bindings + .get(&canonical) + { + if existing != agent { + bail!( + "registration rejected '{}': participant identity '{}' is already registered as '{}'. \ +recover by joining as '{}' or using a distinct identity", + agent, + canonical, + existing, + existing + ); + } + return Ok(false); + } + + workflow_state + .tracking + .registration + .canonical_agent_bindings + .insert(canonical, agent.to_string()); + Ok(true) +} + +fn normalize_agent_identity(input: &str) -> String { + normalize_issue_key(input) +} + +fn registration_deadline(state: &SessionState) -> DateTime { + state.created_at + Duration::seconds(state.config.review_timeout_secs as i64) +} + +fn registration_fallback_requested() -> bool { + std::env::var("RALLY_REGISTRATION_FALLBACK") + .ok() + .map(|value| truthy_env(&value)) + .unwrap_or(false) +} + +fn truthy_env(value: &str) -> bool { + matches!( + value.trim().to_ascii_lowercase().as_str(), + "1" | "true" | "yes" | "on" + ) +} + #[cfg(test)] mod tests { use super::*; @@ -1543,6 +1687,10 @@ mod tests { use serde_json::json; fn test_state(phase: SessionPhase) -> SessionState { + test_state_with_expected_agents(phase, 1) + } + + fn test_state_with_expected_agents(phase: SessionPhase, expected_agents: u32) -> SessionState { let now_ts = now(); let mut agents = BTreeMap::new(); agents.insert( @@ -1561,7 +1709,7 @@ mod tests { phase, created_at: now_ts, config: Config { - expected_agents: 1, + expected_agents, max_rounds: 3, turn_timeout_secs: 300, review_timeout_secs: 300, @@ -1663,4 +1811,59 @@ mod tests { let _ = fs::remove_dir_all(&dir); Ok(()) } + + #[test] + fn strict_registration_rejects_duplicate_canonical_identity() -> Result<()> { + let mut state = test_state_with_expected_agents(SessionPhase::Registration, 3); + let engine = PlanEngine::new(StrictNegotiationPolicy); + + engine.join(&mut state, "agent-a")?; + + let now_ts = now(); + state.agents.insert( + "agent_a".to_string(), + AgentState { + name: "agent_a".to_string(), + joined_at: now_ts, + last_seen: now_ts, + phase_status: "joined".to_string(), + rounds: 0, + }, + ); + let err = engine + .join(&mut state, "agent_a") + .expect_err("duplicate canonical identity should fail"); + assert!(err.to_string().contains("identity 'agent-a'")); + Ok(()) + } + + #[test] + fn strict_registration_fails_closed_after_deadline_without_fallback() -> Result<()> { + let mut state = test_state_with_expected_agents(SessionPhase::Registration, 3); + state.created_at = now() - Duration::seconds(600); + state.config.review_timeout_secs = 1; + let engine = PlanEngine::new(StrictNegotiationPolicy); + + let err = engine + .join(&mut state, "agent-a") + .expect_err("expired registration should fail closed"); + assert!(err.to_string().contains("registration failed closed")); + assert!(err.to_string().contains("RALLY_REGISTRATION_FALLBACK=1")); + Ok(()) + } + + #[test] + fn strict_registration_allows_explicit_fallback_flag() -> Result<()> { + let mut state = test_state_with_expected_agents(SessionPhase::Registration, 3); + state.created_at = now() - Duration::seconds(600); + state.config.review_timeout_secs = 1; + let mut workflow_state = read_workflow_state(&state)?; + workflow_state.tracking.registration.fallback_enabled = true; + state.workflow_state = Some(encode_workflow_state(&workflow_state)?); + + let engine = PlanEngine::new(StrictNegotiationPolicy); + engine.join(&mut state, "agent-a")?; + assert_eq!(state.phase, SessionPhase::Proposal); + Ok(()) + } } diff --git a/src/workflow/builtin.rs b/src/workflow/builtin.rs index 6cd9548..1301ca8 100644 --- a/src/workflow/builtin.rs +++ b/src/workflow/builtin.rs @@ -39,7 +39,7 @@ impl Workflow for PlanWorkflow { fn on_join(&self, ctx: &mut JoinContext<'_>) -> Result { let engine = PlanEngine::new(DefaultPlanPolicy); - engine.join(ctx.host.state)?; + engine.join(ctx.host.state, ctx.agent)?; Ok(JoinDispatch::default()) } @@ -111,7 +111,7 @@ impl Workflow for ComposedNegotiateWorkflow { fn on_join(&self, ctx: &mut JoinContext<'_>) -> Result { let engine = PlanEngine::new(StrictNegotiationPolicy); - engine.join(ctx.host.state)?; + engine.join(ctx.host.state, ctx.agent)?; Ok(JoinDispatch::default()) } From 9d09e374735ba34ec53086be7d52057aebdd7d7a Mon Sep 17 00:00:00 2001 From: Justin Moon Date: Mon, 2 Mar 2026 21:06:23 -0600 Subject: [PATCH 03/11] Add private analysis drafts and deterministic merge --- crates/rally-workflow-plan/src/lib.rs | 265 +++++++++++++++++++++++--- tests/extensibility_mvp.rs | 14 ++ 2 files changed, 249 insertions(+), 30 deletions(-) diff --git a/crates/rally-workflow-plan/src/lib.rs b/crates/rally-workflow-plan/src/lib.rs index 5abf4c3..61d3af9 100644 --- a/crates/rally-workflow-plan/src/lib.rs +++ b/crates/rally-workflow-plan/src/lib.rs @@ -182,6 +182,12 @@ pub struct DraftIssueState { pub slug: String, pub author: String, pub file: String, + #[serde(default = "default_draft_question")] + pub question: String, + #[serde(default = "default_draft_context")] + pub context: String, + #[serde(default)] + pub merged_issue_id: Option, pub created_at: DateTime, } @@ -252,6 +258,14 @@ fn default_registration_min_agents() -> u32 { 1 } +fn default_draft_question() -> String { + "".to_string() +} + +fn default_draft_context() -> String { + "".to_string() +} + impl PlanWorkflowState { pub fn validate(&self) -> Result<()> { if let Some(turn) = &self.turn @@ -264,6 +278,9 @@ impl PlanWorkflowState { if draft.author.trim().is_empty() { bail!("workflow_state draft issue author cannot be empty"); } + if draft.title.trim().is_empty() { + bail!("workflow_state draft issue title cannot be empty"); + } if !seen_drafts.insert(draft.draft_id) { bail!("workflow_state draft issue ids must be unique"); } @@ -496,6 +513,46 @@ pub fn file_issue( bail!("file-issue is only valid during analysis or finalization review"); } + let question = question.unwrap_or("").to_string(); + let context = context.unwrap_or("").to_string(); + let created_at = now(); + let slug = slugify(title); + if state.phase == SessionPhase::Analysis { + let next_draft_id = workflow_state + .tracking + .draft_issues + .iter() + .map(|d| d.draft_id) + .max() + .unwrap_or(0) + + 1; + let file_name = format!("{:02}-{slug}.md", next_draft_id); + let relative = PathBuf::from("analysis") + .join("drafts") + .join(agent) + .join(&file_name); + let draft_path = session_dir.join(&relative); + if let Some(parent) = draft_path.parent() { + fs::create_dir_all(parent)?; + } + let template = draft_issue_template(next_draft_id, title, &question, &context); + fs::write(&draft_path, template)?; + + workflow_state.tracking.draft_issues.push(DraftIssueState { + draft_id: next_draft_id, + title: title.to_string(), + slug, + author: agent.to_string(), + file: relative.display().to_string(), + question, + context, + merged_issue_id: None, + created_at, + }); + write_workflow_state(state, &workflow_state)?; + return Ok(draft_path.display().to_string()); + } + let next_id = workflow_state .issues .iter() @@ -503,18 +560,9 @@ pub fn file_issue( .max() .unwrap_or(0) + 1; - let slug = slugify(title); let file_name = format!("{:02}-{slug}.md", next_id); let issue_path = session_dir.join("issues").join(&file_name); - - let question = question.unwrap_or(""); - let context = context.unwrap_or(""); - let created_at = now(); - let template = format!( - "# Issue {:02}: {}\n\nTitle: {}\nQuestion: {}\nContext: {}\n\n## Position A\n\n## Position B\n", - next_id, title, title, question, context - ); - + let template = issue_template(next_id, title, &question, &context); fs::write(&issue_path, template)?; workflow_state.issues.push(IssueState { @@ -529,25 +577,13 @@ pub fn file_issue( agreed_by: None, created_at, }); - if state.phase == SessionPhase::Analysis { - workflow_state.tracking.draft_issues.push(DraftIssueState { - draft_id: next_id, - title: title.to_string(), - slug: slugify(title), - author: agent.to_string(), - file: file_name.clone(), - created_at, - }); - } - if state.phase == SessionPhase::FinalizationReview { - workflow_state.finalization.reopened_for_issues = true; - state.phase = SessionPhase::Negotiation; - for agent_state in state.agents.values_mut() { - agent_state.rounds = 0; - } - initialize_turn(state, &mut workflow_state)?; + workflow_state.finalization.reopened_for_issues = true; + state.phase = SessionPhase::Negotiation; + for agent_state in state.agents.values_mut() { + agent_state.rounds = 0; } + initialize_turn(state, &mut workflow_state)?; write_workflow_state(state, &workflow_state)?; Ok(issue_path.display().to_string()) @@ -709,10 +745,10 @@ pub fn mark_done(state: &mut SessionState, agent: &str, session_dir: &Path) -> R .values() .all(|a| a.phase_status == "analysis_done") { - renumber_issues_interleaved(&mut workflow_state, session_dir)?; + merge_analysis_drafts_into_issues(&mut workflow_state, session_dir)?; state.phase = SessionPhase::Negotiation; initialize_turn(state, &mut workflow_state)?; - "analysis phase complete; issues normalized; advanced to negotiation".to_string() + "analysis phase complete; drafts merged; advanced to negotiation".to_string() } else { "marked analysis done".to_string() } @@ -1012,7 +1048,63 @@ fn maybe_resolve_negotiation( changed } -fn renumber_issues_interleaved( +fn merge_analysis_drafts_into_issues( + workflow_state: &mut PlanWorkflowState, + session_dir: &Path, +) -> Result<()> { + if workflow_state.tracking.draft_issues.is_empty() { + return interleave_existing_issues_by_author(workflow_state, session_dir); + } + + let interleaved = interleave_drafts_by_author(&workflow_state.tracking.draft_issues); + let issues_dir = session_dir.join("issues"); + fs::create_dir_all(&issues_dir)?; + if issues_dir.exists() { + for entry in fs::read_dir(&issues_dir)? { + let entry = entry?; + if !entry.file_type()?.is_file() { + continue; + } + let path = entry.path(); + if path.extension().and_then(|ext| ext.to_str()) == Some("md") { + fs::remove_file(path)?; + } + } + } + + let mut merged_ids = BTreeMap::new(); + let mut merged_issues = Vec::with_capacity(interleaved.len()); + for (idx, draft) in interleaved.into_iter().enumerate() { + let issue_id = (idx + 1) as u32; + let file_name = format!("{:02}-{}.md", issue_id, draft.slug); + let issue_path = issues_dir.join(&file_name); + let template = issue_template(issue_id, &draft.title, &draft.question, &draft.context); + fs::write(&issue_path, template)?; + + merged_ids.insert(draft.draft_id, issue_id); + merged_issues.push(IssueState { + id: issue_id, + slug: draft.slug, + file: file_name, + title: draft.title, + author: draft.author, + status: IssueStatus::Open, + positions: Default::default(), + challenges: Default::default(), + agreed_by: None, + created_at: draft.created_at, + }); + } + + workflow_state.issues = merged_issues; + for draft in &mut workflow_state.tracking.draft_issues { + draft.merged_issue_id = merged_ids.get(&draft.draft_id).copied(); + } + let _ = recompute_challenge_tracking(workflow_state); + Ok(()) +} + +fn interleave_existing_issues_by_author( workflow_state: &mut PlanWorkflowState, session_dir: &Path, ) -> Result<()> { @@ -1059,6 +1151,37 @@ fn renumber_issues_interleaved( Ok(()) } +fn interleave_drafts_by_author(drafts: &[DraftIssueState]) -> Vec { + let mut drafts_sorted = drafts.to_vec(); + drafts_sorted.sort_by_key(|draft| draft.draft_id); + + let mut by_author: BTreeMap> = BTreeMap::new(); + for draft in drafts_sorted { + by_author + .entry(draft.author.clone()) + .or_default() + .push_back(draft); + } + + let mut interleaved = Vec::with_capacity(drafts.len()); + loop { + let mut progressed = false; + let authors = by_author.keys().cloned().collect::>(); + for author in authors { + let queue = by_author.get_mut(&author).expect("author key must exist"); + if let Some(draft) = queue.pop_front() { + interleaved.push(draft); + progressed = true; + } + } + if !progressed { + break; + } + } + + interleaved +} + fn ensure_turn_holder( state: &SessionState, workflow_state: &PlanWorkflowState, @@ -1089,6 +1212,20 @@ fn issue_dir_path(session_dir: &Path, issue_id: u32, slug: &str) -> PathBuf { .join(format!("{:02}-{slug}", issue_id)) } +fn issue_template(issue_id: u32, title: &str, question: &str, context: &str) -> String { + format!( + "# Issue {:02}: {}\n\nTitle: {}\nQuestion: {}\nContext: {}\n\n## Position A\n\n## Position B\n", + issue_id, title, title, question, context + ) +} + +fn draft_issue_template(draft_id: u32, title: &str, question: &str, context: &str) -> String { + format!( + "# Draft Issue {:02}: {}\n\nTitle: {}\nQuestion: {}\nContext: {}\n\n## Candidate Positions\n\n", + draft_id, title, title, question, context + ) +} + fn sync_issue_challenge_tracking(workflow_state: &mut PlanWorkflowState, issue_id: u32) { if let Some(issue) = workflow_state .issues @@ -1246,6 +1383,11 @@ impl PlanPolicy for StrictNegotiationPolicy { .issues .iter() .any(|issue| normalize_issue_key(&issue.title) == normalized) + || workflow_state + .tracking + .draft_issues + .iter() + .any(|draft| normalize_issue_key(&draft.title) == normalized) { bail!( "strict negotiate policy rejected duplicate issue title '{}'", @@ -1725,6 +1867,20 @@ mod tests { } } + fn add_agent(state: &mut SessionState, name: &str) { + let now_ts = now(); + state.agents.insert( + name.to_string(), + AgentState { + name: name.to_string(), + joined_at: now_ts, + last_seen: now_ts, + phase_status: "analysis_pending".to_string(), + rounds: 0, + }, + ); + } + fn temp_session_dir(prefix: &str) -> PathBuf { let path = std::env::temp_dir().join(format!( "{}-{}", @@ -1812,6 +1968,55 @@ mod tests { Ok(()) } + #[test] + fn analysis_file_issue_keeps_issues_private_until_merge() -> Result<()> { + let dir = temp_session_dir("rally-plan-private-draft-test"); + let mut state = test_state(SessionPhase::Analysis); + let path = file_issue( + &mut state, + &dir, + "agent-a", + "Private draft", + Some("Why"), + Some("Because"), + )?; + assert!( + path.contains("/analysis/drafts/agent-a/"), + "draft path should stay under agent-specific analysis drafts" + ); + let workflow_state = read_workflow_state(&state)?; + assert!(workflow_state.issues.is_empty()); + assert_eq!(workflow_state.tracking.draft_issues.len(), 1); + let _ = fs::remove_dir_all(&dir); + Ok(()) + } + + #[test] + fn analysis_done_merges_drafts_interleaved_by_author() -> Result<()> { + let dir = temp_session_dir("rally-plan-merge-drafts-test"); + let mut state = test_state_with_expected_agents(SessionPhase::Analysis, 2); + add_agent(&mut state, "agent-b"); + + fs::write(dir.join("analysis").join("agent-a.md"), "# A\n")?; + fs::write(dir.join("analysis").join("agent-b.md"), "# B\n")?; + + let _ = file_issue(&mut state, &dir, "agent-a", "A1", None, None)?; + let _ = file_issue(&mut state, &dir, "agent-a", "A2", None, None)?; + let _ = file_issue(&mut state, &dir, "agent-b", "B1", None, None)?; + + let _ = mark_done(&mut state, "agent-a", &dir)?; + let _ = mark_done(&mut state, "agent-b", &dir)?; + + let workflow_state = read_workflow_state(&state)?; + assert_eq!(state.phase, SessionPhase::Negotiation); + assert_eq!(workflow_state.issues.len(), 3); + assert_eq!(workflow_state.issues[0].author, "agent-a"); + assert_eq!(workflow_state.issues[1].author, "agent-b"); + assert_eq!(workflow_state.issues[2].author, "agent-a"); + let _ = fs::remove_dir_all(&dir); + Ok(()) + } + #[test] fn strict_registration_rejects_duplicate_canonical_identity() -> Result<()> { let mut state = test_state_with_expected_agents(SessionPhase::Registration, 3); diff --git a/tests/extensibility_mvp.rs b/tests/extensibility_mvp.rs index 5443868..909d2a8 100644 --- a/tests/extensibility_mvp.rs +++ b/tests/extensibility_mvp.rs @@ -462,6 +462,20 @@ fn composed_negotiate_reuses_templates_and_enforces_strict_policy() -> Result<() let mut state = handle.load_state()?; state.phase = SessionPhase::Negotiation; let mut plan_state = rally_workflow_plan::read_workflow_state(&state)?; + if plan_state.issues.is_empty() { + plan_state.issues.push(rally_workflow_plan::IssueState { + id: 1, + slug: "issue-merge-rule".to_string(), + file: "01-issue-merge-rule.md".to_string(), + title: "Issue Merge Rule".to_string(), + author: "agent-a".to_string(), + status: rally_workflow_plan::IssueStatus::Open, + positions: Default::default(), + challenges: Default::default(), + agreed_by: None, + created_at: state::now(), + }); + } let issue = plan_state .issues .iter_mut() From c4a075f94a7a90d9f232598ed78f5f4480954972 Mon Sep 17 00:00:00 2001 From: Justin Moon Date: Mon, 2 Mar 2026 21:08:59 -0600 Subject: [PATCH 04/11] Gate analysis drafts behind strict policy --- crates/rally-workflow-plan/src/lib.rs | 46 +++++++++++++++++++++------ 1 file changed, 37 insertions(+), 9 deletions(-) diff --git a/crates/rally-workflow-plan/src/lib.rs b/crates/rally-workflow-plan/src/lib.rs index 61d3af9..9cc18bd 100644 --- a/crates/rally-workflow-plan/src/lib.rs +++ b/crates/rally-workflow-plan/src/lib.rs @@ -506,6 +506,7 @@ pub fn file_issue( title: &str, question: Option<&str>, context: Option<&str>, + use_private_drafts: bool, ) -> Result { let mut workflow_state = read_workflow_state(state)?; @@ -517,7 +518,7 @@ pub fn file_issue( let context = context.unwrap_or("").to_string(); let created_at = now(); let slug = slugify(title); - if state.phase == SessionPhase::Analysis { + if state.phase == SessionPhase::Analysis && use_private_drafts { let next_draft_id = workflow_state .tracking .draft_issues @@ -578,12 +579,14 @@ pub fn file_issue( created_at, }); - workflow_state.finalization.reopened_for_issues = true; - state.phase = SessionPhase::Negotiation; - for agent_state in state.agents.values_mut() { - agent_state.rounds = 0; + if state.phase == SessionPhase::FinalizationReview { + workflow_state.finalization.reopened_for_issues = true; + state.phase = SessionPhase::Negotiation; + for agent_state in state.agents.values_mut() { + agent_state.rounds = 0; + } + initialize_turn(state, &mut workflow_state)?; } - initialize_turn(state, &mut workflow_state)?; write_workflow_state(state, &workflow_state)?; Ok(issue_path.display().to_string()) @@ -1532,6 +1535,7 @@ impl PlanEngine

{ let title = self.policy.merge_issue_title(state, &title)?; let question = optional_string(args, "question"); let context = optional_string(args, "context"); + let contract = self.policy.contract(); let path = file_issue( state, session_dir, @@ -1539,6 +1543,7 @@ impl PlanEngine

{ &title, question.as_deref(), context.as_deref(), + contract.draft_issue_phase.enable_private_drafts, )?; format!("filed issue at {path}") } @@ -1946,6 +1951,7 @@ mod tests { "Draft tracking issue", None, None, + true, )?; let workflow_state = read_workflow_state(&state)?; assert_eq!(workflow_state.tracking.draft_issues.len(), 1); @@ -1979,6 +1985,7 @@ mod tests { "Private draft", Some("Why"), Some("Because"), + true, )?; assert!( path.contains("/analysis/drafts/agent-a/"), @@ -1991,6 +1998,27 @@ mod tests { Ok(()) } + #[test] + fn analysis_file_issue_without_private_drafts_writes_issue_immediately() -> Result<()> { + let dir = temp_session_dir("rally-plan-direct-issue-test"); + let mut state = test_state(SessionPhase::Analysis); + let path = file_issue( + &mut state, + &dir, + "agent-a", + "Direct issue", + Some("Q"), + Some("C"), + false, + )?; + assert!(path.contains("/issues/")); + let workflow_state = read_workflow_state(&state)?; + assert_eq!(workflow_state.issues.len(), 1); + assert!(workflow_state.tracking.draft_issues.is_empty()); + let _ = fs::remove_dir_all(&dir); + Ok(()) + } + #[test] fn analysis_done_merges_drafts_interleaved_by_author() -> Result<()> { let dir = temp_session_dir("rally-plan-merge-drafts-test"); @@ -2000,9 +2028,9 @@ mod tests { fs::write(dir.join("analysis").join("agent-a.md"), "# A\n")?; fs::write(dir.join("analysis").join("agent-b.md"), "# B\n")?; - let _ = file_issue(&mut state, &dir, "agent-a", "A1", None, None)?; - let _ = file_issue(&mut state, &dir, "agent-a", "A2", None, None)?; - let _ = file_issue(&mut state, &dir, "agent-b", "B1", None, None)?; + let _ = file_issue(&mut state, &dir, "agent-a", "A1", None, None, true)?; + let _ = file_issue(&mut state, &dir, "agent-a", "A2", None, None, true)?; + let _ = file_issue(&mut state, &dir, "agent-b", "B1", None, None, true)?; let _ = mark_done(&mut state, "agent-a", &dir)?; let _ = mark_done(&mut state, "agent-b", &dir)?; From e29368d24a0c32f3628a69ba79962f246a7f3993 Mon Sep 17 00:00:00 2001 From: Justin Moon Date: Mon, 2 Mar 2026 21:10:58 -0600 Subject: [PATCH 05/11] Add position action and enforce negotiation gates --- crates/rally-workflow-plan/src/lib.rs | 155 +++++++++++++++++++++++++- src/workflow/builtin.rs | 4 +- 2 files changed, 156 insertions(+), 3 deletions(-) diff --git a/crates/rally-workflow-plan/src/lib.rs b/crates/rally-workflow-plan/src/lib.rs index 9cc18bd..c3f18e9 100644 --- a/crates/rally-workflow-plan/src/lib.rs +++ b/crates/rally-workflow-plan/src/lib.rs @@ -646,6 +646,55 @@ pub fn challenge_issue( )) } +pub fn position_issue( + state: &mut SessionState, + session_dir: &Path, + agent: &str, + issue_id: u32, +) -> Result { + let mut workflow_state = read_workflow_state(state)?; + ensure_turn_holder(state, &workflow_state, agent)?; + refresh_negotiation_state_in_place(state, &mut workflow_state, session_dir)?; + + let (current_issue_id, position) = { + let issue = workflow_state + .issues + .iter_mut() + .find(|i| i.id == issue_id) + .ok_or_else(|| anyhow!("issue {:02} not found", issue_id))?; + + if issue.status != IssueStatus::Open { + bail!("issue {:02} is not open", issue_id); + } + + let current_issue_id = issue.id; + let position = position_path(session_dir, issue, agent); + if let Some(parent) = position.parent() { + fs::create_dir_all(parent)?; + } + + if !position.exists() { + fs::write( + &position, + format!( + "# Position for issue {:02}\n\n## Position\n- Add your position details here.\n", + current_issue_id + ), + )?; + } + + issue.positions.insert(agent.to_string()); + (current_issue_id, position) + }; + + let position_display = position.display().to_string(); + write_workflow_state(state, &workflow_state)?; + Ok(format!( + "position recorded for issue {:02} at {}", + current_issue_id, position_display + )) +} + pub fn agree_issue( state: &mut SessionState, session_dir: &Path, @@ -1551,6 +1600,10 @@ impl PlanEngine

{ let issue = required_u32(args, "issue", name)?; challenge_issue(state, session_dir, agent, issue)? } + "position" => { + let issue = required_u32(args, "issue", name)?; + position_issue(state, session_dir, agent, issue)? + } "agree" => { let issue = required_u32(args, "issue", name)?; self.policy.allow_agree(state, issue, agent)?; @@ -1558,7 +1611,7 @@ impl PlanEngine

{ } _ => { bail!( - "unknown action '{}' for plan engine policy '{}'; supported actions: file-issue, challenge, agree", + "unknown action '{}' for plan engine policy '{}'; supported actions: file-issue, position, challenge, agree", name, self.policy.id() ) @@ -1897,6 +1950,21 @@ mod tests { path } + fn open_issue(id: u32, author: &str, title: &str) -> IssueState { + IssueState { + id, + slug: slugify(title), + file: format!("{:02}-{}.md", id, slugify(title)), + title: title.to_string(), + author: author.to_string(), + status: IssueStatus::Open, + positions: BTreeSet::new(), + challenges: BTreeSet::new(), + agreed_by: None, + created_at: now(), + } + } + #[test] fn legacy_state_deserializes_with_new_tracking_defaults() -> Result<()> { let legacy = json!({ @@ -2045,6 +2113,91 @@ mod tests { Ok(()) } + #[test] + fn position_action_records_position_without_challenge() -> Result<()> { + let dir = temp_session_dir("rally-plan-position-action-test"); + let mut state = test_state_with_expected_agents(SessionPhase::Negotiation, 2); + add_agent(&mut state, "agent-b"); + let mut workflow_state = read_workflow_state(&state)?; + workflow_state.issues = vec![open_issue(1, "agent-a", "Position action issue")]; + workflow_state.turn = Some(TurnState { + holder: "agent-a".to_string(), + started_at: now(), + round: 1, + }); + state.workflow_state = Some(encode_workflow_state(&workflow_state)?); + + let _ = position_issue(&mut state, &dir, "agent-a", 1)?; + let workflow_state = read_workflow_state(&state)?; + let issue = workflow_state + .issues + .iter() + .find(|issue| issue.id == 1) + .expect("issue should exist"); + assert!(issue.positions.contains("agent-a")); + assert!(issue.challenges.is_empty()); + let _ = fs::remove_dir_all(&dir); + Ok(()) + } + + #[test] + fn agree_requires_cross_agent_position_and_challenge() -> Result<()> { + let dir = temp_session_dir("rally-plan-agree-gates-test"); + let mut state = test_state_with_expected_agents(SessionPhase::Negotiation, 2); + add_agent(&mut state, "agent-b"); + let mut workflow_state = read_workflow_state(&state)?; + let mut issue = open_issue(1, "agent-a", "Agreement gate issue"); + issue.positions.insert("agent-a".to_string()); + issue.challenges.insert("agent-a".to_string()); + workflow_state.issues = vec![issue]; + workflow_state.turn = Some(TurnState { + holder: "agent-a".to_string(), + started_at: now(), + round: 1, + }); + state.workflow_state = Some(encode_workflow_state(&workflow_state)?); + + let err = agree_issue(&mut state, &dir, "agent-a", 1) + .expect_err("agreement should fail with only self positions"); + assert!(err.to_string().contains("only your own positions")); + + let mut workflow_state = read_workflow_state(&state)?; + let issue = workflow_state + .issues + .iter_mut() + .find(|issue| issue.id == 1) + .expect("issue should exist"); + issue.positions.insert("agent-b".to_string()); + state.workflow_state = Some(encode_workflow_state(&workflow_state)?); + + let err = agree_issue(&mut state, &dir, "agent-a", 1) + .expect_err("agreement should fail without cross-agent challenge"); + assert!(err.to_string().contains("no challenge from another agent")); + let _ = fs::remove_dir_all(&dir); + Ok(()) + } + + #[test] + fn unresolved_issues_escalate_after_round_limit() -> Result<()> { + let mut state = test_state_with_expected_agents(SessionPhase::Negotiation, 2); + add_agent(&mut state, "agent-b"); + state.config.max_rounds = 1; + let mut workflow_state = read_workflow_state(&state)?; + workflow_state.issues = vec![open_issue(1, "agent-a", "Escalation issue")]; + workflow_state.turn = Some(TurnState { + holder: "agent-a".to_string(), + started_at: now(), + round: 1, + }); + + advance_turn(&mut state, &mut workflow_state)?; + assert_eq!(state.phase, SessionPhase::Negotiation); + advance_turn(&mut state, &mut workflow_state)?; + assert_eq!(state.phase, SessionPhase::FinalizationWrite); + assert_eq!(workflow_state.issues[0].status, IssueStatus::Escalated); + Ok(()) + } + #[test] fn strict_registration_rejects_duplicate_canonical_identity() -> Result<()> { let mut state = test_state_with_expected_agents(SessionPhase::Registration, 3); diff --git a/src/workflow/builtin.rs b/src/workflow/builtin.rs index 1301ca8..a8cb121 100644 --- a/src/workflow/builtin.rs +++ b/src/workflow/builtin.rs @@ -34,7 +34,7 @@ impl Workflow for PlanWorkflow { } fn supported_actions(&self) -> &'static [&'static str] { - &["file-issue", "challenge", "agree"] + &["file-issue", "position", "challenge", "agree"] } fn on_join(&self, ctx: &mut JoinContext<'_>) -> Result { @@ -91,7 +91,7 @@ impl Workflow for ComposedNegotiateWorkflow { } fn supported_actions(&self) -> &'static [&'static str] { - &["file-issue", "challenge", "agree"] + &["file-issue", "position", "challenge", "agree"] } fn on_create(&self, ctx: &mut super::CreateContext<'_>) -> Result { From ef00f95e632bd9404d90f935abc2c27ae50c51d8 Mon Sep 17 00:00:00 2001 From: Justin Moon Date: Mon, 2 Mar 2026 21:12:40 -0600 Subject: [PATCH 06/11] Harden finalization approval and artifact output --- crates/rally-workflow-plan/src/lib.rs | 69 +++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/crates/rally-workflow-plan/src/lib.rs b/crates/rally-workflow-plan/src/lib.rs index c3f18e9..ec52f54 100644 --- a/crates/rally-workflow-plan/src/lib.rs +++ b/crates/rally-workflow-plan/src/lib.rs @@ -895,10 +895,17 @@ fn finalize_and_complete( approver: &str, mode: &str, ) -> Result { + let coverage = session_dir.join("coverage-audit.md"); + if !coverage.exists() { + bail!("missing {}", coverage.display()); + } let final_doc = session_dir.join("final.md"); if !final_doc.exists() { bail!("missing {}", final_doc.display()); } + parse_steps_from_todo(&final_doc).map_err(|err| { + anyhow!("final.md must be write-todo format (## Spec + ## Plan with numbered steps): {err}") + })?; let todo_doc = session_dir.join("todo.md"); fs::copy(&final_doc, &todo_doc)?; workflow_state.finalization.reviewer = Some(approver.to_string()); @@ -1965,6 +1972,10 @@ mod tests { } } + fn valid_final_doc() -> &'static str { + "## Spec\n\nfinalize\n\n## Plan\n\n1. Ship\nAcceptance criteria:\n- done\n" + } + #[test] fn legacy_state_deserializes_with_new_tracking_defaults() -> Result<()> { let legacy = json!({ @@ -2198,6 +2209,64 @@ mod tests { Ok(()) } + #[test] + fn finalization_write_requires_artifacts_and_valid_format() -> Result<()> { + let dir = temp_session_dir("rally-plan-finalization-write-test"); + let mut state = test_state_with_expected_agents(SessionPhase::FinalizationWrite, 2); + add_agent(&mut state, "agent-b"); + let mut workflow_state = read_workflow_state(&state)?; + workflow_state.finalization.writer = Some("agent-a".to_string()); + state.workflow_state = Some(encode_workflow_state(&workflow_state)?); + + let err = mark_done(&mut state, "agent-a", &dir) + .expect_err("finalization write should require coverage-audit.md"); + assert!(err.to_string().contains("coverage-audit.md")); + + fs::write(dir.join("coverage-audit.md"), "coverage")?; + fs::write(dir.join("final.md"), "not a todo format")?; + let err = mark_done(&mut state, "agent-a", &dir) + .expect_err("final.md should require write-todo format"); + assert!(err.to_string().contains("write-todo format")); + + fs::write(dir.join("final.md"), valid_final_doc())?; + let message = mark_done(&mut state, "agent-a", &dir)?; + assert!(message.contains("advanced to review")); + assert_eq!(state.phase, SessionPhase::FinalizationReview); + let _ = fs::remove_dir_all(&dir); + Ok(()) + } + + #[test] + fn finalization_review_requires_distinct_reviewer_and_writes_todo() -> Result<()> { + let dir = temp_session_dir("rally-plan-finalization-review-test"); + let mut state = test_state_with_expected_agents(SessionPhase::FinalizationReview, 2); + add_agent(&mut state, "agent-b"); + let mut workflow_state = read_workflow_state(&state)?; + workflow_state.finalization.writer = Some("agent-a".to_string()); + state.workflow_state = Some(encode_workflow_state(&workflow_state)?); + fs::write(dir.join("coverage-audit.md"), "coverage")?; + fs::write(dir.join("final.md"), valid_final_doc())?; + + let err = mark_done(&mut state, "agent-a", &dir) + .expect_err("writer should not be able to self-approve in multi-agent sessions"); + assert!(err.to_string().contains("cannot self-approve")); + + let message = mark_done(&mut state, "agent-b", &dir)?; + assert!(message.contains("completed session")); + assert_eq!(state.phase, SessionPhase::Done); + assert!(dir.join("todo.md").exists()); + let todo = fs::read_to_string(dir.join("todo.md"))?; + assert_eq!(todo, valid_final_doc()); + let workflow_state = read_workflow_state(&state)?; + assert_eq!( + workflow_state.finalization.reviewer.as_deref(), + Some("agent-b") + ); + assert!(workflow_state.finalization.todo_ready); + let _ = fs::remove_dir_all(&dir); + Ok(()) + } + #[test] fn strict_registration_rejects_duplicate_canonical_identity() -> Result<()> { let mut state = test_state_with_expected_agents(SessionPhase::Registration, 3); From b27b38d298b11bc81d44a23379d8c559ecb63f33 Mon Sep 17 00:00:00 2001 From: Justin Moon Date: Mon, 2 Mar 2026 21:14:52 -0600 Subject: [PATCH 07/11] Polish negotiate instructions status and action errors --- crates/rally-workflow-plan/src/lib.rs | 194 ++++++++++++++++++++------ 1 file changed, 150 insertions(+), 44 deletions(-) diff --git a/crates/rally-workflow-plan/src/lib.rs b/crates/rally-workflow-plan/src/lib.rs index ec52f54..1e3d39d 100644 --- a/crates/rally-workflow-plan/src/lib.rs +++ b/crates/rally-workflow-plan/src/lib.rs @@ -397,6 +397,7 @@ pub fn get_instruction(state: &SessionState, agent: &str, session_dir: &Path) -> } let mut lines = Vec::new(); + lines.push("Phase: Negotiation".to_string()); lines.push(format!( "It is your turn in negotiation (round {}).", turn.round @@ -1585,47 +1586,56 @@ impl PlanEngine

{ name: &str, args: &Value, ) -> Result { - let message = match name { - "file-issue" => { - let title = required_string(args, "title", name)?; - let title = self.policy.merge_issue_title(state, &title)?; - let question = optional_string(args, "question"); - let context = optional_string(args, "context"); - let contract = self.policy.contract(); - let path = file_issue( - state, - session_dir, - agent, - &title, - question.as_deref(), - context.as_deref(), - contract.draft_issue_phase.enable_private_drafts, - )?; - format!("filed issue at {path}") - } - "challenge" => { - let issue = required_u32(args, "issue", name)?; - challenge_issue(state, session_dir, agent, issue)? - } - "position" => { - let issue = required_u32(args, "issue", name)?; - position_issue(state, session_dir, agent, issue)? - } - "agree" => { - let issue = required_u32(args, "issue", name)?; - self.policy.allow_agree(state, issue, agent)?; - agree_issue(state, session_dir, agent, issue)? - } - _ => { - bail!( - "unknown action '{}' for plan engine policy '{}'; supported actions: file-issue, position, challenge, agree", - name, - self.policy.id() - ) + let message = (|| -> Result { + match name { + "file-issue" => { + let title = required_string(args, "title", name)?; + let title = self.policy.merge_issue_title(state, &title)?; + let question = optional_string(args, "question"); + let context = optional_string(args, "context"); + let contract = self.policy.contract(); + let path = file_issue( + state, + session_dir, + agent, + &title, + question.as_deref(), + context.as_deref(), + contract.draft_issue_phase.enable_private_drafts, + )?; + Ok(format!("filed issue at {path}")) + } + "challenge" => { + let issue = required_u32(args, "issue", name)?; + challenge_issue(state, session_dir, agent, issue) + } + "position" => { + let issue = required_u32(args, "issue", name)?; + position_issue(state, session_dir, agent, issue) + } + "agree" => { + let issue = required_u32(args, "issue", name)?; + self.policy.allow_agree(state, issue, agent)?; + agree_issue(state, session_dir, agent, issue) + } + _ => { + bail!( + "unknown action '{}' for plan engine policy '{}'; supported actions: file-issue, position, challenge, agree", + name, + self.policy.id() + ) + } } - }; - - Ok(message) + })(); + + message.map_err(|err| { + anyhow!( + "action '{}' failed: {}. Supported actions: file-issue, position, challenge, agree. Preconditions: {}", + name, + err, + action_preconditions(name) + ) + }) } pub fn status(&self, state: &SessionState) -> Result> { @@ -1662,6 +1672,21 @@ impl PlanEngine

{ "Issue summary: {} open, {} agreed, {} escalated", open, agreed, escalated )); + lines.push(format!( + "Finalization: writer={} reviewer={} todo_ready={} reopened_for_issues={}", + workflow_state + .finalization + .writer + .as_deref() + .unwrap_or("unassigned"), + workflow_state + .finalization + .reviewer + .as_deref() + .unwrap_or("pending"), + workflow_state.finalization.todo_ready, + workflow_state.finalization.reopened_for_issues + )); Ok(lines) } @@ -1739,7 +1764,7 @@ pub fn proposal_instruction( agent: &str, ) -> String { format!( - "Write your proposal to: {proposal_path}\nSession topic: {topic}\nAfter writing it, run: rally done --session {session} --as {agent}" + "Phase: Proposal\nSession topic: {topic}\nRequired file:\n- {proposal_path}\nNext command:\n- rally done --session {session} --as {agent}" ) } @@ -1750,7 +1775,7 @@ pub fn analysis_instruction( agent: &str, ) -> String { format!( - "Read proposal files:\n{proposals}\nWrite divergence analysis to {analysis_path}.\nFile issues with: rally plan file-issue --session {session} --as {agent} --title \"...\"\nWhen complete, run: rally done --session {session} --as {agent}" + "Phase: Analysis\nRead proposal files:\n{proposals}\nRequired file:\n- {analysis_path}\nOptional action:\n- rally plan file-issue --session {session} --as {agent} --title \"...\"\nNext command when analysis is complete:\n- rally done --session {session} --as {agent}" ) } @@ -1761,7 +1786,7 @@ pub fn finalization_write_instruction( agent: &str, ) -> String { format!( - "Write final artifacts: {coverage_path} and {final_path}.\n`final.md` must use write-todo format:\n- include `## Spec`\n- include `## Plan`\n- use numbered steps (`1.`, `2.`, ...)\n- include acceptance criteria per step\n- include a final manual QA step\nThen run: rally done --session {session} --as {agent}" + "Phase: FinalizationWrite\nRequired files:\n- {coverage_path}\n- {final_path}\n`final.md` must use write-todo format:\n- include `## Spec`\n- include `## Plan`\n- use numbered steps (`1.`, `2.`, ...)\n- include acceptance criteria per step\n- include a final manual QA step\nNext command:\n- rally done --session {session} --as {agent}" ) } @@ -1772,10 +1797,24 @@ pub fn finalization_review_instruction( agent: &str, ) -> String { format!( - "Review {coverage_path} and {final_path}. If approved, run rally done --session {session} --as {agent}. If rejected, file issues with rally plan file-issue, which returns the session to negotiation." + "Phase: FinalizationReview\nRequired files to review:\n- {coverage_path}\n- {final_path}\nIf approved, run:\n- rally done --session {session} --as {agent}\nIf changes are needed, file issue(s) and return to negotiation:\n- rally plan file-issue --session {session} --as {agent} --title \"...\"" ) } +fn action_preconditions(name: &str) -> &'static str { + match name { + "file-issue" => { + "requires analysis (or finalization review) phase; `title` is required in args-json" + } + "position" => "requires negotiation phase, your active turn, and an open issue id", + "challenge" => "requires negotiation phase, your active turn, and an open issue id", + "agree" => { + "requires negotiation phase, your active turn, an open issue id, plus cross-agent position and challenge prerequisites" + } + _ => "see supported actions list", + } +} + fn required_string(args: &Value, key: &str, action: &str) -> Result { args.get(key) .and_then(Value::as_str) @@ -2267,6 +2306,73 @@ mod tests { Ok(()) } + #[test] + fn status_includes_turn_issue_summary_and_finalization_details() -> Result<()> { + let state = test_state_with_expected_agents(SessionPhase::Negotiation, 1); + let mut workflow_state = read_workflow_state(&state)?; + workflow_state.turn = Some(TurnState { + holder: "agent-a".to_string(), + started_at: now(), + round: 2, + }); + workflow_state.finalization.writer = Some("agent-a".to_string()); + workflow_state.finalization.reviewer = Some("agent-b".to_string()); + workflow_state.finalization.todo_ready = true; + workflow_state.issues = vec![open_issue(1, "agent-a", "Status issue")]; + let mut state = state; + state.workflow_state = Some(encode_workflow_state(&workflow_state)?); + + let lines = PlanEngine::new(DefaultPlanPolicy).status(&state)?; + assert!( + lines + .iter() + .any(|line| line.starts_with("Turn: holder=agent-a")) + ); + assert!( + lines + .iter() + .any(|line| line.contains("Issue summary: 1 open, 0 agreed, 0 escalated")) + ); + assert!( + lines + .iter() + .any(|line| line.contains("Finalization: writer=agent-a reviewer=agent-b")) + ); + Ok(()) + } + + #[test] + fn action_errors_include_supported_actions_and_preconditions() -> Result<()> { + let dir = temp_session_dir("rally-plan-action-errors-test"); + let mut state = test_state_with_expected_agents(SessionPhase::Negotiation, 2); + add_agent(&mut state, "agent-b"); + let mut workflow_state = read_workflow_state(&state)?; + workflow_state.issues = vec![open_issue(1, "agent-a", "Action error issue")]; + workflow_state.turn = Some(TurnState { + holder: "agent-a".to_string(), + started_at: now(), + round: 1, + }); + state.workflow_state = Some(encode_workflow_state(&workflow_state)?); + + let err = PlanEngine::new(DefaultPlanPolicy) + .action(&mut state, &dir, "agent-a", "agree", &json!({})) + .expect_err("missing issue id should be reported with preconditions"); + let err_msg = err.to_string(); + assert!(err_msg.contains("Supported actions: file-issue, position, challenge, agree")); + assert!(err_msg.contains("Preconditions:")); + let _ = fs::remove_dir_all(&dir); + Ok(()) + } + + #[test] + fn instructions_include_phase_required_files_and_next_command() { + let proposal = proposal_instruction("/tmp/proposal.md", "topic", "session-a", "agent-a"); + assert!(proposal.contains("Phase: Proposal")); + assert!(proposal.contains("Required file:")); + assert!(proposal.contains("Next command:")); + } + #[test] fn strict_registration_rejects_duplicate_canonical_identity() -> Result<()> { let mut state = test_state_with_expected_agents(SessionPhase::Registration, 3); From 422cd086bb473a3e4218d234c9fb0415b5dbaca8 Mon Sep 17 00:00:00 2001 From: Justin Moon Date: Mon, 2 Mar 2026 21:17:55 -0600 Subject: [PATCH 08/11] Add negotiate workflow invariant integration tests --- tests/extensibility_mvp.rs | 500 +++++++++++++++++++++++++++++++++++++ 1 file changed, 500 insertions(+) diff --git a/tests/extensibility_mvp.rs b/tests/extensibility_mvp.rs index 909d2a8..9029c02 100644 --- a/tests/extensibility_mvp.rs +++ b/tests/extensibility_mvp.rs @@ -2,6 +2,7 @@ use std::{ fs, path::{Path, PathBuf}, sync::atomic::{AtomicUsize, Ordering}, + thread, time::{SystemTime, UNIX_EPOCH}, }; @@ -541,6 +542,505 @@ fn composed_negotiate_reuses_templates_and_enforces_strict_policy() -> Result<() Ok(()) } +#[test] +fn negotiate_happy_path_runs_analysis_positions_and_finalization() -> Result<()> { + let registry = builtin_registry()?; + let session = unique_name("rally-negotiate-happy-path-it"); + let session_dir = state::session_dir(&session); + let _ = fs::remove_dir_all(&session_dir); + + run_cli( + &[ + "rally", + "create", + "workflow", + "--name", + &session, + "--workflow", + "demo/negotiate", + "--agents", + "3", + "--topic", + "happy path", + ], + ®istry, + )?; + for agent in ["agent-a", "agent-b", "agent-c"] { + commands::join_with_registry( + &JoinArgs { + session: session.clone(), + agent: agent.to_string(), + timeout: None, + }, + ®istry, + )?; + } + + for agent in ["agent-a", "agent-b", "agent-c"] { + write_file( + &session_dir + .join("sources") + .join(format!("{agent}-proposal.md")), + &format!("# {agent} proposal\n"), + ); + run_cli( + &["rally", "done", "--session", &session, "--as", agent], + ®istry, + )?; + } + + write_file( + &session_dir.join("analysis").join("agent-a.md"), + "# analysis a\n", + ); + run_cli( + &[ + "rally", + "workflow", + "action", + "--session", + &session, + "--as", + "agent-a", + "--name", + "file-issue", + "--args-json", + r#"{"title":"Happy path issue"}"#, + ], + ®istry, + )?; + for agent in ["agent-a", "agent-b", "agent-c"] { + write_file( + &session_dir.join("analysis").join(format!("{agent}.md")), + &format!("# {agent} analysis\n"), + ); + run_cli( + &["rally", "done", "--session", &session, "--as", agent], + ®istry, + )?; + } + + run_cli( + &[ + "rally", + "workflow", + "action", + "--session", + &session, + "--as", + "agent-a", + "--name", + "position", + "--args-json", + r#"{"issue":1}"#, + ], + ®istry, + )?; + run_cli( + &["rally", "done", "--session", &session, "--as", "agent-a"], + ®istry, + )?; + run_cli( + &[ + "rally", + "workflow", + "action", + "--session", + &session, + "--as", + "agent-b", + "--name", + "challenge", + "--args-json", + r#"{"issue":1}"#, + ], + ®istry, + )?; + run_cli( + &["rally", "done", "--session", &session, "--as", "agent-b"], + ®istry, + )?; + run_cli( + &[ + "rally", + "workflow", + "action", + "--session", + &session, + "--as", + "agent-c", + "--name", + "challenge", + "--args-json", + r#"{"issue":1}"#, + ], + ®istry, + )?; + run_cli( + &["rally", "done", "--session", &session, "--as", "agent-c"], + ®istry, + )?; + run_cli( + &[ + "rally", + "workflow", + "action", + "--session", + &session, + "--as", + "agent-a", + "--name", + "agree", + "--args-json", + r#"{"issue":1}"#, + ], + ®istry, + )?; + + assert_eq!( + commands::next_with_registry( + &NextArgs { + session: session.clone(), + agent: "agent-a".to_string(), + timeout: Some(2), + }, + ®istry + )?, + 0 + ); + write_file( + &session_dir.join("coverage-audit.md"), + "# coverage\n- issue 1\n", + ); + write_file(&session_dir.join("final.md"), plan_final_doc()); + run_cli( + &["rally", "done", "--session", &session, "--as", "agent-a"], + ®istry, + )?; + run_cli( + &["rally", "done", "--session", &session, "--as", "agent-b"], + ®istry, + )?; + + let handle = SessionHandle::open(&session)?; + let state = handle.load_state()?; + assert_eq!(state.phase, SessionPhase::Done); + assert!(session_dir.join("todo.md").exists()); + assert_eq!( + fs::read_to_string(session_dir.join("todo.md"))?, + fs::read_to_string(session_dir.join("final.md"))? + ); + drop(handle); + let _ = fs::remove_dir_all(&session_dir); + Ok(()) +} + +#[test] +fn negotiate_failure_cases_cover_identity_peer_timeout_agree_and_approver() -> Result<()> { + let registry = builtin_registry()?; + + let dup_session = unique_name("rally-negotiate-dup-id-it"); + let dup_dir = state::session_dir(&dup_session); + let _ = fs::remove_dir_all(&dup_dir); + run_cli( + &[ + "rally", + "create", + "workflow", + "--name", + &dup_session, + "--workflow", + "demo/negotiate", + "--agents", + "3", + ], + ®istry, + )?; + commands::join_with_registry( + &JoinArgs { + session: dup_session.clone(), + agent: "agent-a".to_string(), + timeout: None, + }, + ®istry, + )?; + let err = commands::join_with_registry( + &JoinArgs { + session: dup_session.clone(), + agent: "agent_a".to_string(), + timeout: None, + }, + ®istry, + ) + .expect_err("duplicate canonical identity should be rejected"); + assert!(err.to_string().contains("identity 'agent-a'")); + let _ = fs::remove_dir_all(&dup_dir); + + let timeout_session = unique_name("rally-negotiate-timeout-it"); + let timeout_dir = state::session_dir(&timeout_session); + let _ = fs::remove_dir_all(&timeout_dir); + run_cli( + &[ + "rally", + "create", + "workflow", + "--name", + &timeout_session, + "--workflow", + "demo/negotiate", + "--agents", + "3", + "--review-timeout-secs", + "1", + ], + ®istry, + )?; + let handle = SessionHandle::open(&timeout_session)?; + let mut state = handle.load_state()?; + state.created_at = state::now() - chrono::Duration::seconds(600); + handle.save_state(&state)?; + drop(handle); + let err = commands::join_with_registry( + &JoinArgs { + session: timeout_session.clone(), + agent: "agent-a".to_string(), + timeout: None, + }, + ®istry, + ) + .expect_err("missing peers should fail closed after timeout"); + assert!(err.to_string().contains("registration failed closed")); + let _ = fs::remove_dir_all(&timeout_dir); + + let fail_session = unique_name("rally-negotiate-failures-it"); + let fail_dir = state::session_dir(&fail_session); + let _ = fs::remove_dir_all(&fail_dir); + run_cli( + &[ + "rally", + "create", + "workflow", + "--name", + &fail_session, + "--workflow", + "demo/negotiate", + "--agents", + "3", + ], + ®istry, + )?; + for agent in ["agent-a", "agent-b", "agent-c"] { + commands::join_with_registry( + &JoinArgs { + session: fail_session.clone(), + agent: agent.to_string(), + timeout: None, + }, + ®istry, + )?; + } + let handle = SessionHandle::open(&fail_session)?; + let mut state = handle.load_state()?; + state.phase = SessionPhase::Negotiation; + state.workflow_state = Some(json!({ + "state_version": 2, + "turn": { + "holder": "agent-a", + "started_at": state::now().to_rfc3339(), + "round": 1 + }, + "issues": [{ + "id": 1, + "slug": "failure-issue", + "file": "01-failure-issue.md", + "title": "Failure issue", + "author": "agent-a", + "status": "OPEN", + "positions": ["agent-a"], + "challenges": ["agent-a"], + "agreed_by": null, + "created_at": state::now().to_rfc3339() + }], + "finalization": { + "writer": "agent-a", + "reviewer": null, + "todo_ready": false, + "reopened_for_issues": false + }, + "tracking": {} + })); + handle.save_state(&state)?; + drop(handle); + + let err = run_cli( + &[ + "rally", + "workflow", + "action", + "--session", + &fail_session, + "--as", + "agent-a", + "--name", + "agree", + "--args-json", + r#"{"issue":1}"#, + ], + ®istry, + ) + .expect_err("premature agree should fail"); + let err_text = err.to_string(); + assert!( + err_text.contains("cross-agent challenges") || err_text.contains("only your own positions") + ); + + let handle = SessionHandle::open(&fail_session)?; + let mut state = handle.load_state()?; + state.phase = SessionPhase::FinalizationReview; + state.workflow_state = Some(json!({ + "state_version": 2, + "turn": null, + "issues": [{ + "id": 1, + "slug": "failure-issue", + "file": "01-failure-issue.md", + "title": "Failure issue", + "author": "agent-a", + "status": "AGREED", + "positions": ["agent-a", "agent-b"], + "challenges": ["agent-b", "agent-c"], + "agreed_by": "agent-a", + "created_at": state::now().to_rfc3339() + }], + "finalization": { + "writer": "agent-a", + "reviewer": null, + "todo_ready": false, + "reopened_for_issues": false + }, + "tracking": {} + })); + handle.save_state(&state)?; + drop(handle); + write_file(&fail_dir.join("coverage-audit.md"), "# coverage\n"); + write_file(&fail_dir.join("final.md"), plan_final_doc()); + let err = run_cli( + &[ + "rally", + "done", + "--session", + &fail_session, + "--as", + "agent-a", + ], + ®istry, + ) + .expect_err("writer self-approval should be rejected"); + assert!(err.to_string().contains("cannot self-approve")); + let _ = fs::remove_dir_all(&fail_dir); + Ok(()) +} + +#[test] +fn invalid_concurrent_actions_do_not_corrupt_negotiate_state() -> Result<()> { + let registry = builtin_registry()?; + let session = unique_name("rally-negotiate-concurrency-it"); + let session_dir = state::session_dir(&session); + let _ = fs::remove_dir_all(&session_dir); + run_cli( + &[ + "rally", + "create", + "workflow", + "--name", + &session, + "--workflow", + "demo/negotiate", + "--agents", + "3", + ], + ®istry, + )?; + for agent in ["agent-a", "agent-b", "agent-c"] { + commands::join_with_registry( + &JoinArgs { + session: session.clone(), + agent: agent.to_string(), + timeout: None, + }, + ®istry, + )?; + } + let handle = SessionHandle::open(&session)?; + let mut state = handle.load_state()?; + state.phase = SessionPhase::Negotiation; + state.workflow_state = Some(json!({ + "state_version": 2, + "turn": { + "holder": "agent-a", + "started_at": state::now().to_rfc3339(), + "round": 1 + }, + "issues": [{ + "id": 1, + "slug": "concurrency-issue", + "file": "01-concurrency-issue.md", + "title": "Concurrency issue", + "author": "agent-a", + "status": "OPEN", + "positions": [], + "challenges": [], + "agreed_by": null, + "created_at": state::now().to_rfc3339() + }], + "finalization": { + "writer": null, + "reviewer": null, + "todo_ready": false, + "reopened_for_issues": false + }, + "tracking": {} + })); + handle.save_state(&state)?; + drop(handle); + + let state_path = session_dir.join("state.json"); + let before = fs::read_to_string(&state_path)?; + thread::scope(|scope| { + for agent in ["agent-b", "agent-c"] { + let session = session.clone(); + scope.spawn(move || { + let registry = builtin_registry().expect("registry"); + let err = run_cli( + &[ + "rally", + "workflow", + "action", + "--session", + &session, + "--as", + agent, + "--name", + "position", + "--args-json", + r#"{"issue":1}"#, + ], + ®istry, + ) + .expect_err("non-turn-holder action should fail"); + assert!(err.to_string().contains("currently 'agent-a' turn")); + }); + } + }); + let after = fs::read_to_string(&state_path)?; + assert_eq!( + before, after, + "invalid concurrent actions must not mutate state" + ); + let _ = fs::remove_dir_all(&session_dir); + Ok(()) +} + #[test] fn create_workflow_rejects_builtin_workflow_ids() -> Result<()> { let registry = builtin_registry()?; From 2befc71c5cc4ab1eabe43123ea1031876c1e4072 Mon Sep 17 00:00:00 2001 From: Justin Moon Date: Mon, 2 Mar 2026 21:19:07 -0600 Subject: [PATCH 09/11] Add user-run manual QA guide for negotiate workflow --- docs/manual-qa-negotiate-workflow.md | 58 ++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 docs/manual-qa-negotiate-workflow.md diff --git a/docs/manual-qa-negotiate-workflow.md b/docs/manual-qa-negotiate-workflow.md new file mode 100644 index 0000000..6b94664 --- /dev/null +++ b/docs/manual-qa-negotiate-workflow.md @@ -0,0 +1,58 @@ +# Manual QA: Negotiate Workflow + +Run from repository root using the built binary: + +`/Users/justin/code/rally/target/debug/rally` + +## 1. Create and Join + +1. Create session: +`/Users/justin/code/rally/target/debug/rally create workflow --name qa-negotiate-final --workflow demo/negotiate --agents 3 --topic "manual qa negotiate"` + +2. Join participants: +`/Users/justin/code/rally/target/debug/rally join --session qa-negotiate-final --as agent-a` +`/Users/justin/code/rally/target/debug/rally join --session qa-negotiate-final --as agent-b` +`/Users/justin/code/rally/target/debug/rally join --session qa-negotiate-final --as agent-c` + +## 2. Validate Rule Enforcement + +1. Proposal + analysis: +- For each agent, use `rally next` and write required proposal/analysis files. +- During analysis, file at least one issue from `agent-a`. +- Optional negative checks: + - Duplicate identity: try `--as agent_a` after `agent-a` is joined (should fail). + - Duplicate draft title from another agent (should fail under strict policy). + +2. Negotiation: +- Use `position`/`challenge` actions with turn order. +- Confirm `agree` is rejected before cross-agent challenge prerequisites are met. +- Confirm non-turn-holder `position`/`challenge` fails with turn-holder precondition errors. + +3. Finalization: +- In `FinalizationWrite`, confirm missing `coverage-audit.md` or invalid `final.md` blocks completion. +- In `FinalizationReview`, confirm the writer cannot self-approve in multi-agent mode. + +## 3. Verify Output Artifacts + +1. Complete the session so it reaches `Done`. +2. Confirm `~/.rally/sessions/qa-negotiate-final/todo.md` exists and matches `final.md`. +3. Confirm `todo.md` is valid write-todo format (`## Spec` + `## Plan` with numbered steps). + +## 4. Verify Build Workflow Usability + +1. Create a build session using the produced `todo.md`: +`/Users/justin/code/rally/target/debug/rally create implement --name qa-negotiate-build --todo ~/.rally/sessions/qa-negotiate-final/todo.md --workspace /Users/justin/code/rally --reviewers 1` + +2. Join implementer/reviewer and run `rally next` once per role to confirm the todo parses and step instructions render. + +## 5. Verify Prompt/Behavior Reuse + +1. Compare `rally next` wording for proposal/analysis/finalization between: +- `builtin/plan` sessions (`rally create plan ...`) +- `demo/negotiate` sessions (`rally create workflow --workflow demo/negotiate ...`) + +2. Confirm shared instruction structure is preserved while strict negotiate rules are enforced. + +## 6. Cleanup + +`rm -rf ~/.rally/sessions/qa-negotiate-final ~/.rally/sessions/qa-negotiate-build` From 94c993c751b077776a9c241c5e32aad30b2976f8 Mon Sep 17 00:00:00 2001 From: Justin Moon Date: Mon, 2 Mar 2026 21:24:26 -0600 Subject: [PATCH 10/11] Apply formatting updates across CLI and workflow modules --- crates/rally-workflow-build/src/lib.rs | 14 +- src/cli.rs | 12 +- src/command_surface.rs | 70 ++++++---- src/workflow/interop.rs | 169 ++++++++++++++----------- tests/command_install_run.rs | 84 +++++++----- 5 files changed, 211 insertions(+), 138 deletions(-) diff --git a/crates/rally-workflow-build/src/lib.rs b/crates/rally-workflow-build/src/lib.rs index 55c8f62..f2e7d31 100644 --- a/crates/rally-workflow-build/src/lib.rs +++ b/crates/rally-workflow-build/src/lib.rs @@ -253,8 +253,10 @@ pub fn get_wait_hint(state: &SessionState, agent: &str) -> Option { } } else if workflow_state.reviews.iter().any(|r| r.reviewer == agent) { if state.phase == SessionPhase::FinalReview { - Some("Your final review was submitted. Waiting for remaining reviewer verdicts." - .to_string()) + Some( + "Your final review was submitted. Waiting for remaining reviewer verdicts." + .to_string(), + ) } else { Some( "Your review was submitted. Waiting for remaining reviewer verdicts or decision." @@ -355,7 +357,9 @@ pub fn submit_review( .as_ref() .ok_or_else(|| anyhow!("cannot review without an active checkpoint"))?; - let step_dir = session_dir.join("reviews").join(review_bucket_name(checkpoint)); + let step_dir = session_dir + .join("reviews") + .join(review_bucket_name(checkpoint)); fs::create_dir_all(&step_dir)?; let review_file = step_dir.join(format!("{reviewer}.md")); let step_label = if checkpoint.step_number == 0 { @@ -472,7 +476,9 @@ pub fn process_review_state(state: &mut SessionState) -> Result> "final review requested changes; returned to implement phase".to_string(), )); } - return Ok(Some("changes requested; returned to implement phase".to_string())); + return Ok(Some( + "changes requested; returned to implement phase".to_string(), + )); } if all_approve { diff --git a/src/cli.rs b/src/cli.rs index ca78c20..f558e7f 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -104,7 +104,10 @@ pub struct CreateArgs { pub todo: Option, #[arg(long)] pub workspace: Option, - #[arg(long, help = "Workflow id for `create workflow` (for example myco/security-review)")] + #[arg( + long, + help = "Workflow id for `create workflow` (for example myco/security-review)" + )] pub workflow: Option, #[arg( long, @@ -364,7 +367,12 @@ pub struct CommandRunArgs { #[derive(Args, Debug)] pub struct CommandDoctorArgs { - #[arg(long, value_enum, default_value = "all", help = "Target harness to inspect")] + #[arg( + long, + value_enum, + default_value = "all", + help = "Target harness to inspect" + )] pub target: CommandTargetArg, #[arg(long, help = "Print checks without changing filesystem")] pub dry_run: bool, diff --git a/src/command_surface.rs b/src/command_surface.rs index 8c3eee0..615dbef 100644 --- a/src/command_surface.rs +++ b/src/command_surface.rs @@ -11,8 +11,8 @@ use serde::{Deserialize, Serialize}; use crate::{ WorkflowRegistry, cli::{ - CommandDoctorArgs, CommandInstallArgs, CommandRunArgs, CommandTargetArg, JoinArgs, - CommandUninstallArgs, + CommandDoctorArgs, CommandInstallArgs, CommandRunArgs, CommandTargetArg, + CommandUninstallArgs, JoinArgs, }, commands, state::SessionHandle, @@ -238,7 +238,8 @@ struct AdapterEnvironment { impl AdapterEnvironment { fn detect() -> Result { - let home_dir = dirs::home_dir().ok_or_else(|| anyhow!("unable to resolve home directory"))?; + let home_dir = + dirs::home_dir().ok_or_else(|| anyhow!("unable to resolve home directory"))?; Ok(Self { home_dir }) } @@ -556,7 +557,8 @@ fn classify_install_action( return Ok(InstallAction::Create); } - let current = fs::read(path).with_context(|| format!("reading existing wrapper {}", path.display()))?; + let current = + fs::read(path).with_context(|| format!("reading existing wrapper {}", path.display()))?; if current == content.as_bytes() { return Ok(InstallAction::Noop); } @@ -661,7 +663,9 @@ fn doctor_with_environment( DoctorStatus::InstalledManagedModified } } - ManagedOwnership::ManagedForOtherTarget(_) => DoctorStatus::InstalledManagedWrongTarget, + ManagedOwnership::ManagedForOtherTarget(_) => { + DoctorStatus::InstalledManagedWrongTarget + } ManagedOwnership::NotManaged => DoctorStatus::InstalledUnmanaged, } }; @@ -761,25 +765,40 @@ fn resolve_run_context( prompter: &mut dyn Prompter, ) -> Result { let mut session = args.session.clone(); - let mut agent = args.agent.clone().map(|value| normalize_agent_selector(&value)); - let workflow = workflow_selector.clone().or_else(|| saved.and_then(|ctx| ctx.workflow.clone())); - - if session.is_none() && let Some(invite_ctx) = &invite { + let mut agent = args + .agent + .clone() + .map(|value| normalize_agent_selector(&value)); + let workflow = workflow_selector + .clone() + .or_else(|| saved.and_then(|ctx| ctx.workflow.clone())); + + if session.is_none() + && let Some(invite_ctx) = &invite + { session = Some(invite_ctx.session.clone()); } - if agent.is_none() && let Some(selector) = role_selector { + if agent.is_none() + && let Some(selector) = role_selector + { agent = Some(normalize_agent_selector(&selector)); } - if agent.is_none() && let Some(invite_ctx) = &invite { + if agent.is_none() + && let Some(invite_ctx) = &invite + { agent = Some(invite_ctx.agent.clone()); } - if session.is_none() && let Some(saved_ctx) = saved { + if session.is_none() + && let Some(saved_ctx) = saved + { session = Some(saved_ctx.session.clone()); } - if agent.is_none() && let Some(saved_ctx) = saved { + if agent.is_none() + && let Some(saved_ctx) = saved + { agent = Some(saved_ctx.agent.clone()); } @@ -941,8 +960,7 @@ mod tests { fn discover_install_path_uses_default_dot_directory_layout() -> Result<()> { let home = unique_dir("rally-command-adapter-layout"); let root = home.join(".droid"); - fs::create_dir_all(&root) - .with_context(|| format!("creating {}", root.display()))?; + fs::create_dir_all(&root).with_context(|| format!("creating {}", root.display()))?; let adapter = DotDirAdapter::new(HarnessTarget::Droid); let path = adapter.discover_install_path(&env_with_home(&home))?; assert_eq!(path, root.join("commands").join("rally.md")); @@ -1136,7 +1154,8 @@ mod tests { .contains("refusing to overwrite non-Rally file") ); - let uninstall = uninstall_with_environment(&uninstall_args(CommandTargetArg::Pi, false), &env)?; + let uninstall = + uninstall_with_environment(&uninstall_args(CommandTargetArg::Pi, false), &env)?; assert_eq!(uninstall[0].action, UninstallAction::SkippedUnmanaged); assert!(wrapper.exists()); Ok(()) @@ -1202,7 +1221,8 @@ mod tests { fs::create_dir_all(unmanaged.parent().expect("wrapper parent"))?; fs::write(&unmanaged, "custom content")?; - let outcome = uninstall_with_environment(&uninstall_args(CommandTargetArg::All, false), &env)?; + let outcome = + uninstall_with_environment(&uninstall_args(CommandTargetArg::All, false), &env)?; let codex = outcome .iter() .find(|o| o.target == HarnessTarget::Codex) @@ -1230,7 +1250,8 @@ mod tests { &render_ctx, )?; let wrapper = root.join("commands").join("rally.md"); - let outcome = uninstall_with_environment(&uninstall_args(CommandTargetArg::Pi, true), &env)?; + let outcome = + uninstall_with_environment(&uninstall_args(CommandTargetArg::Pi, true), &env)?; assert_eq!(outcome[0].action, UninstallAction::WouldRemoveManaged); assert!(wrapper.exists()); Ok(()) @@ -1283,14 +1304,8 @@ mod tests { agent: "reviewer-1".to_string(), }; let mut prompter = StaticPrompter::new(&[]); - let resolved = resolve_run_context( - &args, - None, - None, - Some(invite), - Some(&saved), - &mut prompter, - )?; + let resolved = + resolve_run_context(&args, None, None, Some(invite), Some(&saved), &mut prompter)?; assert_eq!(resolved.session, "token-session"); assert_eq!(resolved.agent, "reviewer-1"); Ok(()) @@ -1308,8 +1323,7 @@ mod tests { workflow: Some("build".to_string()), }; let mut prompter = StaticPrompter::new(&[]); - let resolved = - resolve_run_context(&args, None, None, None, Some(&saved), &mut prompter)?; + let resolved = resolve_run_context(&args, None, None, None, Some(&saved), &mut prompter)?; assert_eq!(resolved.session, "saved-session"); assert_eq!(resolved.agent, "saved-agent"); assert_eq!(resolved.workflow.as_deref(), Some("build")); diff --git a/src/workflow/interop.rs b/src/workflow/interop.rs index ef5743b..758874e 100644 --- a/src/workflow/interop.rs +++ b/src/workflow/interop.rs @@ -26,23 +26,29 @@ pub fn delegate_join( target: DelegationTarget<'_>, agent: &str, ) -> Result { - with_child_session(parent, session_dir, registry, target, |session_dir, workflow, child| { - let mut ctx = JoinContext { - host: HostContextMut { - session_dir, - state: child, - }, - agent, - }; - match dispatch_join(workflow.as_ref(), &mut ctx)? { - WorkflowDispatch::Join(dispatch) => Ok(dispatch), - other => bail!( - "workflow '{}' returned unexpected delegation dispatch for join: {:?}", - workflow.id(), - other - ), - } - }) + with_child_session( + parent, + session_dir, + registry, + target, + |session_dir, workflow, child| { + let mut ctx = JoinContext { + host: HostContextMut { + session_dir, + state: child, + }, + agent, + }; + match dispatch_join(workflow.as_ref(), &mut ctx)? { + WorkflowDispatch::Join(dispatch) => Ok(dispatch), + other => bail!( + "workflow '{}' returned unexpected delegation dispatch for join: {:?}", + workflow.id(), + other + ), + } + }, + ) } pub fn delegate_next_poll( @@ -52,23 +58,29 @@ pub fn delegate_next_poll( target: DelegationTarget<'_>, agent: &str, ) -> Result { - with_child_session(parent, session_dir, registry, target, |session_dir, workflow, child| { - let mut ctx = NextPollContext { - host: HostContextMut { - session_dir, - state: child, - }, - agent, - }; - match dispatch_next_poll(workflow.as_ref(), &mut ctx)? { - WorkflowDispatch::NextPoll(dispatch) => Ok(dispatch), - other => bail!( - "workflow '{}' returned unexpected delegation dispatch for next poll: {:?}", - workflow.id(), - other - ), - } - }) + with_child_session( + parent, + session_dir, + registry, + target, + |session_dir, workflow, child| { + let mut ctx = NextPollContext { + host: HostContextMut { + session_dir, + state: child, + }, + agent, + }; + match dispatch_next_poll(workflow.as_ref(), &mut ctx)? { + WorkflowDispatch::NextPoll(dispatch) => Ok(dispatch), + other => bail!( + "workflow '{}' returned unexpected delegation dispatch for next poll: {:?}", + workflow.id(), + other + ), + } + }, + ) } pub fn delegate_done( @@ -78,23 +90,29 @@ pub fn delegate_done( target: DelegationTarget<'_>, agent: &str, ) -> Result { - with_child_session(parent, session_dir, registry, target, |session_dir, workflow, child| { - let mut ctx = DoneContext { - host: HostContextMut { - session_dir, - state: child, - }, - agent, - }; - match dispatch_done(workflow.as_ref(), &mut ctx)? { - WorkflowDispatch::Done(dispatch) => Ok(dispatch), - other => bail!( - "workflow '{}' returned unexpected delegation dispatch for done: {:?}", - workflow.id(), - other - ), - } - }) + with_child_session( + parent, + session_dir, + registry, + target, + |session_dir, workflow, child| { + let mut ctx = DoneContext { + host: HostContextMut { + session_dir, + state: child, + }, + agent, + }; + match dispatch_done(workflow.as_ref(), &mut ctx)? { + WorkflowDispatch::Done(dispatch) => Ok(dispatch), + other => bail!( + "workflow '{}' returned unexpected delegation dispatch for done: {:?}", + workflow.id(), + other + ), + } + }, + ) } pub fn delegate_action( @@ -106,26 +124,32 @@ pub fn delegate_action( name: &str, args: &Value, ) -> Result { - with_child_session(parent, session_dir, registry, target, |session_dir, workflow, child| { - let mut ctx = ActionContext { - host: HostContextMut { - session_dir, - state: child, - }, - agent, - name, - args, - }; - match dispatch_action(workflow.as_ref(), &mut ctx)? { - WorkflowDispatch::Action(dispatch) => Ok(dispatch), - other => bail!( - "workflow '{}' returned unexpected delegation dispatch for action '{}': {:?}", - workflow.id(), + with_child_session( + parent, + session_dir, + registry, + target, + |session_dir, workflow, child| { + let mut ctx = ActionContext { + host: HostContextMut { + session_dir, + state: child, + }, + agent, name, - other - ), - } - }) + args, + }; + match dispatch_action(workflow.as_ref(), &mut ctx)? { + WorkflowDispatch::Action(dispatch) => Ok(dispatch), + other => bail!( + "workflow '{}' returned unexpected delegation dispatch for action '{}': {:?}", + workflow.id(), + name, + other + ), + } + }, + ) } pub fn delegate_status( @@ -177,7 +201,10 @@ where Ok(result) } -fn projected_child_session(parent: &SessionState, target: DelegationTarget<'_>) -> Result { +fn projected_child_session( + parent: &SessionState, + target: DelegationTarget<'_>, +) -> Result { validate_namespace(target.namespace)?; let mut child = parent.clone(); child.workflow_id = Some(target.workflow_id.to_string()); diff --git a/tests/command_install_run.rs b/tests/command_install_run.rs index 1b502ad..2ef43df 100644 --- a/tests/command_install_run.rs +++ b/tests/command_install_run.rs @@ -99,7 +99,13 @@ fn command_install_run_entrypoint_integration() -> Result<()> { let pi_root = temp_home.join(".pi"); let claude_root = temp_home.join(".claude"); let factory_root = temp_home.join(".factory"); - for root in [&codex_root, &droid_root, &pi_root, &claude_root, &factory_root] { + for root in [ + &codex_root, + &droid_root, + &pi_root, + &claude_root, + &factory_root, + ] { fs::create_dir_all(root)?; } @@ -137,7 +143,10 @@ fn command_install_run_entrypoint_integration() -> Result<()> { ®istry, ) .expect_err("unmanaged wrapper should require --force"); - assert!(err.to_string().contains("refusing to overwrite non-Rally file")); + assert!( + err.to_string() + .contains("refusing to overwrite non-Rally file") + ); run_cli( &["rally", "command", "doctor", "--target", "all", "--dry-run"], @@ -162,7 +171,10 @@ fn command_install_run_entrypoint_integration() -> Result<()> { ®istry, )?; assert!(!codex_wrapper.exists()); - assert_eq!(fs::read_to_string(&droid_wrapper)?, "custom wrapper content"); + assert_eq!( + fs::read_to_string(&droid_wrapper)?, + "custom wrapper content" + ); let todo_one = temp_home.join("todo-one.md"); let todo_two = temp_home.join("todo-two.md"); @@ -242,10 +254,7 @@ fn command_install_run_entrypoint_integration() -> Result<()> { let session_one_seen_before = read_agent_last_seen(&temp_home, &session_one, "implementer")?; assert_eq!( - run_cli( - &["rally", "command", "run", "--non-interactive"], - ®istry, - )?, + run_cli(&["rally", "command", "run", "--non-interactive"], ®istry,)?, 0 ); let session_one_seen_after = read_agent_last_seen(&temp_home, &session_one, "implementer")?; @@ -253,38 +262,44 @@ fn command_install_run_entrypoint_integration() -> Result<()> { assert_eq!( run_cli( - &[ - "rally", - "command", - "run", - &format!("rly:{session_two}:implementer"), - "--non-interactive", - ], - ®istry, - )?, + &[ + "rally", + "command", + "run", + &format!("rly:{session_two}:implementer"), + "--non-interactive", + ], + ®istry, + )?, 0 ); - assert_eq!(read_context_session(&temp_home, &manifest_dir)?, session_two); + assert_eq!( + read_context_session(&temp_home, &manifest_dir)?, + session_two + ); assert_eq!( run_cli( - &[ - "rally", - "command", - "run", - "--session", - &session_one, - "--as", - "implementer", - "--invite", - &format!("rly:{session_two}:implementer"), - "--non-interactive", - ], - ®istry, - )?, + &[ + "rally", + "command", + "run", + "--session", + &session_one, + "--as", + "implementer", + "--invite", + &format!("rly:{session_two}:implementer"), + "--non-interactive", + ], + ®istry, + )?, 0 ); - assert_eq!(read_context_session(&temp_home, &manifest_dir)?, session_one); + assert_eq!( + read_context_session(&temp_home, &manifest_dir)?, + session_one + ); fs::write(&context_store, "{not-json}")?; assert_eq!( @@ -327,7 +342,10 @@ fn command_install_run_entrypoint_integration() -> Result<()> { String::from_utf8_lossy(&output.stdout), String::from_utf8_lossy(&output.stderr) ); - assert_eq!(read_context_session(&temp_home, &manifest_dir)?, session_two); + assert_eq!( + read_context_session(&temp_home, &manifest_dir)?, + session_two + ); { let handle = SessionHandle::open(&session_two)?; From d2714397a3409d2ef9fc46a5e0231332a0ecc110 Mon Sep 17 00:00:00 2001 From: Justin Moon Date: Mon, 2 Mar 2026 21:57:39 -0600 Subject: [PATCH 11/11] Allow strict negotiate workflow with 2+ agents --- crates/rally-workflow-plan/src/lib.rs | 74 +++++++++++++++++++++++---- 1 file changed, 63 insertions(+), 11 deletions(-) diff --git a/crates/rally-workflow-plan/src/lib.rs b/crates/rally-workflow-plan/src/lib.rs index 1e3d39d..1351069 100644 --- a/crates/rally-workflow-plan/src/lib.rs +++ b/crates/rally-workflow-plan/src/lib.rs @@ -150,7 +150,7 @@ impl PlanPolicyContract { pub fn strict_negotiate() -> Self { Self { registration: RegistrationPolicyContract { - min_agents: 3, + min_agents: 2, enforce_unique_identity: true, fail_closed_missing_peers: true, allow_single_agent_fallback: false, @@ -1429,7 +1429,8 @@ impl PlanPolicy for StrictNegotiationPolicy { let contract = self.contract(); if state.config.expected_agents < contract.registration.min_agents { bail!( - "strict negotiate policy requires at least 3 agents (got {})", + "strict negotiate policy requires at least {} agents (got {})", + contract.registration.min_agents, state.config.expected_agents ); } @@ -1473,17 +1474,21 @@ impl PlanPolicy for StrictNegotiationPolicy { .iter() .filter(|name| *name != agent) .count(); - let required = contract.agreement.min_cross_agent_challenges as usize; + let required = (contract.agreement.min_cross_agent_challenges as usize) + .min(state.agents.len().saturating_sub(1)); if cross_agent_challenges < required { - if required == 2 { - bail!( + match required { + 1 => bail!( + "strict negotiate policy requires at least one cross-agent challenge before agree" + ), + 2 => bail!( "strict negotiate policy requires at least two cross-agent challenges before agree" - ); + ), + _ => bail!( + "strict negotiate policy requires at least {} cross-agent challenges before agree", + required + ), } - bail!( - "strict negotiate policy requires at least {} cross-agent challenges before agree", - required - ); } Ok(()) } @@ -2039,7 +2044,7 @@ mod tests { fn strict_policy_contract_covers_negotiate_constraints() { let policy = StrictNegotiationPolicy; let contract = policy.contract(); - assert_eq!(contract.registration.min_agents, 3); + assert_eq!(contract.registration.min_agents, 2); assert!(contract.registration.enforce_unique_identity); assert!(contract.registration.fail_closed_missing_peers); assert!(contract.draft_issue_phase.enable_private_drafts); @@ -2053,6 +2058,53 @@ mod tests { assert_eq!(contract.finalization.min_agreed_issues, 1); } + #[test] + fn strict_policy_scales_agree_threshold_for_two_agents() -> Result<()> { + let mut state = test_state_with_expected_agents(SessionPhase::Negotiation, 2); + add_agent(&mut state, "agent-b"); + let mut workflow_state = read_workflow_state(&state)?; + let mut issue = open_issue(1, "agent-a", "Threshold issue"); + issue.challenges.insert("agent-b".to_string()); + workflow_state.issues = vec![issue]; + state.workflow_state = Some(encode_workflow_state(&workflow_state)?); + + let policy = StrictNegotiationPolicy; + policy.allow_agree(&state, 1, "agent-a")?; + Ok(()) + } + + #[test] + fn strict_policy_requires_two_cross_agent_challenges_with_three_agents() -> Result<()> { + let mut state = test_state_with_expected_agents(SessionPhase::Negotiation, 3); + add_agent(&mut state, "agent-b"); + add_agent(&mut state, "agent-c"); + let mut workflow_state = read_workflow_state(&state)?; + let mut issue = open_issue(1, "agent-a", "Threshold issue"); + issue.challenges.insert("agent-b".to_string()); + workflow_state.issues = vec![issue]; + state.workflow_state = Some(encode_workflow_state(&workflow_state)?); + + let policy = StrictNegotiationPolicy; + let err = policy + .allow_agree(&state, 1, "agent-a") + .expect_err("three-agent sessions should still require two cross-agent challenges"); + assert!(err + .to_string() + .contains("at least two cross-agent challenges")); + + let mut workflow_state = read_workflow_state(&state)?; + let issue = workflow_state + .issues + .iter_mut() + .find(|issue| issue.id == 1) + .expect("issue should exist"); + issue.challenges.insert("agent-c".to_string()); + state.workflow_state = Some(encode_workflow_state(&workflow_state)?); + + policy.allow_agree(&state, 1, "agent-a")?; + Ok(()) + } + #[test] fn tracking_fields_capture_analysis_drafts_and_challenge_presence() -> Result<()> { let dir = temp_session_dir("rally-plan-tracking-test");