diff --git a/crates/rally-workflow-build/src/lib.rs b/crates/rally-workflow-build/src/lib.rs index 55c8f62..f2e7d31 100644 --- a/crates/rally-workflow-build/src/lib.rs +++ b/crates/rally-workflow-build/src/lib.rs @@ -253,8 +253,10 @@ pub fn get_wait_hint(state: &SessionState, agent: &str) -> Option { } } else if workflow_state.reviews.iter().any(|r| r.reviewer == agent) { if state.phase == SessionPhase::FinalReview { - Some("Your final review was submitted. Waiting for remaining reviewer verdicts." - .to_string()) + Some( + "Your final review was submitted. Waiting for remaining reviewer verdicts." + .to_string(), + ) } else { Some( "Your review was submitted. Waiting for remaining reviewer verdicts or decision." @@ -355,7 +357,9 @@ pub fn submit_review( .as_ref() .ok_or_else(|| anyhow!("cannot review without an active checkpoint"))?; - let step_dir = session_dir.join("reviews").join(review_bucket_name(checkpoint)); + let step_dir = session_dir + .join("reviews") + .join(review_bucket_name(checkpoint)); fs::create_dir_all(&step_dir)?; let review_file = step_dir.join(format!("{reviewer}.md")); let step_label = if checkpoint.step_number == 0 { @@ -472,7 +476,9 @@ pub fn process_review_state(state: &mut SessionState) -> Result> "final review requested changes; returned to implement phase".to_string(), )); } - return Ok(Some("changes requested; returned to implement phase".to_string())); + return Ok(Some( + "changes requested; returned to implement phase".to_string(), + )); } if all_approve { diff --git a/crates/rally-workflow-plan/src/lib.rs b/crates/rally-workflow-plan/src/lib.rs index 101b98e..1351069 100644 --- a/crates/rally-workflow-plan/src/lib.rs +++ b/crates/rally-workflow-plan/src/lib.rs @@ -48,6 +48,169 @@ pub struct FinalizationState { pub reopened_for_issues: bool, } +#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct RegistrationPolicyContract { + #[serde(default = "default_registration_min_agents")] + pub min_agents: u32, + #[serde(default)] + pub enforce_unique_identity: bool, + #[serde(default)] + pub fail_closed_missing_peers: bool, + #[serde(default)] + pub allow_single_agent_fallback: bool, +} + +impl Default for RegistrationPolicyContract { + fn default() -> Self { + Self { + min_agents: default_registration_min_agents(), + enforce_unique_identity: false, + fail_closed_missing_peers: false, + allow_single_agent_fallback: true, + } + } +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct DraftIssuePolicyContract { + #[serde(default)] + pub enable_private_drafts: bool, + #[serde(default)] + pub merge_before_negotiation: bool, +} + +impl Default for DraftIssuePolicyContract { + fn default() -> Self { + Self { + enable_private_drafts: false, + merge_before_negotiation: false, + } + } +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct AgreementPolicyContract { + #[serde(default)] + pub require_challenge_before_agree: bool, + #[serde(default)] + pub disallow_self_agreement: bool, + #[serde(default)] + pub min_cross_agent_challenges: u32, +} + +impl Default for AgreementPolicyContract { + fn default() -> Self { + Self { + require_challenge_before_agree: false, + disallow_self_agreement: false, + min_cross_agent_challenges: 0, + } + } +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct FinalizationPolicyContract { + #[serde(default)] + pub require_coverage_audit: bool, + #[serde(default)] + pub require_final_doc: bool, + #[serde(default)] + pub require_writer_reviewer_separation: bool, + #[serde(default)] + pub allow_single_agent_fallback: bool, + #[serde(default)] + pub min_agreed_issues: u32, +} + +impl Default for FinalizationPolicyContract { + fn default() -> Self { + Self { + require_coverage_audit: false, + require_final_doc: false, + require_writer_reviewer_separation: false, + allow_single_agent_fallback: true, + min_agreed_issues: 0, + } + } +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq, Default)] +pub struct PlanPolicyContract { + #[serde(default)] + pub registration: RegistrationPolicyContract, + #[serde(default)] + pub draft_issue_phase: DraftIssuePolicyContract, + #[serde(default)] + pub agreement: AgreementPolicyContract, + #[serde(default)] + pub finalization: FinalizationPolicyContract, +} + +impl PlanPolicyContract { + pub fn strict_negotiate() -> Self { + Self { + registration: RegistrationPolicyContract { + min_agents: 2, + enforce_unique_identity: true, + fail_closed_missing_peers: true, + allow_single_agent_fallback: false, + }, + draft_issue_phase: DraftIssuePolicyContract { + enable_private_drafts: true, + merge_before_negotiation: true, + }, + agreement: AgreementPolicyContract { + require_challenge_before_agree: true, + disallow_self_agreement: true, + min_cross_agent_challenges: 2, + }, + finalization: FinalizationPolicyContract { + require_coverage_audit: true, + require_final_doc: true, + require_writer_reviewer_separation: true, + allow_single_agent_fallback: false, + min_agreed_issues: 1, + }, + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct DraftIssueState { + pub draft_id: u32, + pub title: String, + pub slug: String, + pub author: String, + pub file: String, + #[serde(default = "default_draft_question")] + pub question: String, + #[serde(default = "default_draft_context")] + pub context: String, + #[serde(default)] + pub merged_issue_id: Option, + pub created_at: DateTime, +} + +#[derive(Clone, Debug, Serialize, Deserialize, Default)] +pub struct RegistrationTrackingState { + #[serde(default)] + pub canonical_agent_bindings: BTreeMap, + #[serde(default)] + pub fallback_enabled: bool, +} + +#[derive(Clone, Debug, Serialize, Deserialize, Default)] +pub struct NegotiationTrackingState { + #[serde(default)] + pub registration: RegistrationTrackingState, + #[serde(default)] + pub analysis_completed_agents: BTreeSet, + #[serde(default)] + pub draft_issues: Vec, + #[serde(default)] + pub issues_with_challenges: BTreeSet, +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct PlanWorkflowState { #[serde(default = "default_state_version")] @@ -58,6 +221,8 @@ pub struct PlanWorkflowState { pub issues: Vec, #[serde(default)] pub finalization: FinalizationState, + #[serde(default)] + pub tracking: NegotiationTrackingState, } impl Default for PlanWorkflowState { @@ -67,6 +232,7 @@ impl Default for PlanWorkflowState { turn: None, issues: Vec::new(), finalization: FinalizationState::default(), + tracking: NegotiationTrackingState::default(), } } } @@ -85,9 +251,21 @@ pub struct PlanDoneDispatch { } fn default_state_version() -> u32 { + 2 +} + +fn default_registration_min_agents() -> u32 { 1 } +fn default_draft_question() -> String { + "".to_string() +} + +fn default_draft_context() -> String { + "".to_string() +} + impl PlanWorkflowState { pub fn validate(&self) -> Result<()> { if let Some(turn) = &self.turn @@ -95,6 +273,23 @@ impl PlanWorkflowState { { bail!("workflow_state turn holder cannot be empty"); } + let mut seen_drafts = BTreeSet::new(); + for draft in &self.tracking.draft_issues { + if draft.author.trim().is_empty() { + bail!("workflow_state draft issue author cannot be empty"); + } + if draft.title.trim().is_empty() { + bail!("workflow_state draft issue title cannot be empty"); + } + if !seen_drafts.insert(draft.draft_id) { + bail!("workflow_state draft issue ids must be unique"); + } + } + for (canonical, agent) in &self.tracking.registration.canonical_agent_bindings { + if canonical.trim().is_empty() || agent.trim().is_empty() { + bail!("workflow_state registration bindings cannot contain empty values"); + } + } Ok(()) } } @@ -202,6 +397,7 @@ pub fn get_instruction(state: &SessionState, agent: &str, session_dir: &Path) -> } let mut lines = Vec::new(); + lines.push("Phase: Negotiation".to_string()); lines.push(format!( "It is your turn in negotiation (round {}).", turn.round @@ -311,6 +507,7 @@ pub fn file_issue( title: &str, question: Option<&str>, context: Option<&str>, + use_private_drafts: bool, ) -> Result { let mut workflow_state = read_workflow_state(state)?; @@ -318,6 +515,46 @@ pub fn file_issue( bail!("file-issue is only valid during analysis or finalization review"); } + let question = question.unwrap_or("").to_string(); + let context = context.unwrap_or("").to_string(); + let created_at = now(); + let slug = slugify(title); + if state.phase == SessionPhase::Analysis && use_private_drafts { + let next_draft_id = workflow_state + .tracking + .draft_issues + .iter() + .map(|d| d.draft_id) + .max() + .unwrap_or(0) + + 1; + let file_name = format!("{:02}-{slug}.md", next_draft_id); + let relative = PathBuf::from("analysis") + .join("drafts") + .join(agent) + .join(&file_name); + let draft_path = session_dir.join(&relative); + if let Some(parent) = draft_path.parent() { + fs::create_dir_all(parent)?; + } + let template = draft_issue_template(next_draft_id, title, &question, &context); + fs::write(&draft_path, template)?; + + workflow_state.tracking.draft_issues.push(DraftIssueState { + draft_id: next_draft_id, + title: title.to_string(), + slug, + author: agent.to_string(), + file: relative.display().to_string(), + question, + context, + merged_issue_id: None, + created_at, + }); + write_workflow_state(state, &workflow_state)?; + return Ok(draft_path.display().to_string()); + } + let next_id = workflow_state .issues .iter() @@ -325,30 +562,22 @@ pub fn file_issue( .max() .unwrap_or(0) + 1; - let slug = slugify(title); let file_name = format!("{:02}-{slug}.md", next_id); let issue_path = session_dir.join("issues").join(&file_name); - - let question = question.unwrap_or(""); - let context = context.unwrap_or(""); - let template = format!( - "# Issue {:02}: {}\n\nTitle: {}\nQuestion: {}\nContext: {}\n\n## Position A\n\n## Position B\n", - next_id, title, title, question, context - ); - + let template = issue_template(next_id, title, &question, &context); fs::write(&issue_path, template)?; workflow_state.issues.push(IssueState { id: next_id, slug, - file: file_name, + file: file_name.clone(), title: title.to_string(), author: agent.to_string(), status: IssueStatus::Open, positions: Default::default(), challenges: Default::default(), agreed_by: None, - created_at: now(), + created_at, }); if state.phase == SessionPhase::FinalizationReview { @@ -374,37 +603,41 @@ pub fn challenge_issue( ensure_turn_holder(state, &workflow_state, agent)?; refresh_negotiation_state_in_place(state, &mut workflow_state, session_dir)?; - let issue = workflow_state - .issues - .iter_mut() - .find(|i| i.id == issue_id) - .ok_or_else(|| anyhow!("issue {:02} not found", issue_id))?; + let (current_issue_id, position) = { + let issue = workflow_state + .issues + .iter_mut() + .find(|i| i.id == issue_id) + .ok_or_else(|| anyhow!("issue {:02} not found", issue_id))?; - if issue.status != IssueStatus::Open { - bail!("issue {:02} is not open", issue_id); - } + if issue.status != IssueStatus::Open { + bail!("issue {:02} is not open", issue_id); + } - let current_issue_id = issue.id; - let position = position_path(session_dir, issue, agent); - if let Some(parent) = position.parent() { - fs::create_dir_all(parent)?; - } + let current_issue_id = issue.id; + let position = position_path(session_dir, issue, agent); + if let Some(parent) = position.parent() { + fs::create_dir_all(parent)?; + } - if !position.exists() { - fs::write( - &position, - format!("# Position for issue {:02}\n\n", current_issue_id), - )?; - } + if !position.exists() { + fs::write( + &position, + format!("# Position for issue {:02}\n\n", current_issue_id), + )?; + } - let marker = format!( - "\n## Challenge {}\n- Add your challenge details here.\n", - now().to_rfc3339() - ); - append_text(&position, &marker)?; + let marker = format!( + "\n## Challenge {}\n- Add your challenge details here.\n", + now().to_rfc3339() + ); + append_text(&position, &marker)?; - issue.positions.insert(agent.to_string()); - issue.challenges.insert(agent.to_string()); + issue.positions.insert(agent.to_string()); + issue.challenges.insert(agent.to_string()); + (current_issue_id, position) + }; + sync_issue_challenge_tracking(&mut workflow_state, current_issue_id); let position_display = position.display().to_string(); write_workflow_state(state, &workflow_state)?; @@ -414,6 +647,55 @@ pub fn challenge_issue( )) } +pub fn position_issue( + state: &mut SessionState, + session_dir: &Path, + agent: &str, + issue_id: u32, +) -> Result { + let mut workflow_state = read_workflow_state(state)?; + ensure_turn_holder(state, &workflow_state, agent)?; + refresh_negotiation_state_in_place(state, &mut workflow_state, session_dir)?; + + let (current_issue_id, position) = { + let issue = workflow_state + .issues + .iter_mut() + .find(|i| i.id == issue_id) + .ok_or_else(|| anyhow!("issue {:02} not found", issue_id))?; + + if issue.status != IssueStatus::Open { + bail!("issue {:02} is not open", issue_id); + } + + let current_issue_id = issue.id; + let position = position_path(session_dir, issue, agent); + if let Some(parent) = position.parent() { + fs::create_dir_all(parent)?; + } + + if !position.exists() { + fs::write( + &position, + format!( + "# Position for issue {:02}\n\n## Position\n- Add your position details here.\n", + current_issue_id + ), + )?; + } + + issue.positions.insert(agent.to_string()); + (current_issue_id, position) + }; + + let position_display = position.display().to_string(); + write_workflow_state(state, &workflow_state)?; + Ok(format!( + "position recorded for issue {:02} at {}", + current_issue_id, position_display + )) +} + pub fn agree_issue( state: &mut SessionState, session_dir: &Path, @@ -507,15 +789,19 @@ pub fn mark_done(state: &mut SessionState, agent: &str, session_dir: &Path) -> R if let Some(agent_state) = state.agents.get_mut(agent) { agent_state.phase_status = "analysis_done".to_string(); } + workflow_state + .tracking + .analysis_completed_agents + .insert(agent.to_string()); if state .agents .values() .all(|a| a.phase_status == "analysis_done") { - renumber_issues_interleaved(&mut workflow_state, session_dir)?; + merge_analysis_drafts_into_issues(&mut workflow_state, session_dir)?; state.phase = SessionPhase::Negotiation; initialize_turn(state, &mut workflow_state)?; - "analysis phase complete; issues normalized; advanced to negotiation".to_string() + "analysis phase complete; drafts merged; advanced to negotiation".to_string() } else { "marked analysis done".to_string() } @@ -610,10 +896,17 @@ fn finalize_and_complete( approver: &str, mode: &str, ) -> Result { + let coverage = session_dir.join("coverage-audit.md"); + if !coverage.exists() { + bail!("missing {}", coverage.display()); + } let final_doc = session_dir.join("final.md"); if !final_doc.exists() { bail!("missing {}", final_doc.display()); } + parse_steps_from_todo(&final_doc).map_err(|err| { + anyhow!("final.md must be write-todo format (## Spec + ## Plan with numbered steps): {err}") + })?; let todo_doc = session_dir.join("todo.md"); fs::copy(&final_doc, &todo_doc)?; workflow_state.finalization.reviewer = Some(approver.to_string()); @@ -697,6 +990,9 @@ fn refresh_negotiation_state_in_place( } } } + if recompute_challenge_tracking(workflow_state) { + changed = true; + } if maybe_resolve_negotiation(state, workflow_state) { changed = true; @@ -812,7 +1108,63 @@ fn maybe_resolve_negotiation( changed } -fn renumber_issues_interleaved( +fn merge_analysis_drafts_into_issues( + workflow_state: &mut PlanWorkflowState, + session_dir: &Path, +) -> Result<()> { + if workflow_state.tracking.draft_issues.is_empty() { + return interleave_existing_issues_by_author(workflow_state, session_dir); + } + + let interleaved = interleave_drafts_by_author(&workflow_state.tracking.draft_issues); + let issues_dir = session_dir.join("issues"); + fs::create_dir_all(&issues_dir)?; + if issues_dir.exists() { + for entry in fs::read_dir(&issues_dir)? { + let entry = entry?; + if !entry.file_type()?.is_file() { + continue; + } + let path = entry.path(); + if path.extension().and_then(|ext| ext.to_str()) == Some("md") { + fs::remove_file(path)?; + } + } + } + + let mut merged_ids = BTreeMap::new(); + let mut merged_issues = Vec::with_capacity(interleaved.len()); + for (idx, draft) in interleaved.into_iter().enumerate() { + let issue_id = (idx + 1) as u32; + let file_name = format!("{:02}-{}.md", issue_id, draft.slug); + let issue_path = issues_dir.join(&file_name); + let template = issue_template(issue_id, &draft.title, &draft.question, &draft.context); + fs::write(&issue_path, template)?; + + merged_ids.insert(draft.draft_id, issue_id); + merged_issues.push(IssueState { + id: issue_id, + slug: draft.slug, + file: file_name, + title: draft.title, + author: draft.author, + status: IssueStatus::Open, + positions: Default::default(), + challenges: Default::default(), + agreed_by: None, + created_at: draft.created_at, + }); + } + + workflow_state.issues = merged_issues; + for draft in &mut workflow_state.tracking.draft_issues { + draft.merged_issue_id = merged_ids.get(&draft.draft_id).copied(); + } + let _ = recompute_challenge_tracking(workflow_state); + Ok(()) +} + +fn interleave_existing_issues_by_author( workflow_state: &mut PlanWorkflowState, session_dir: &Path, ) -> Result<()> { @@ -855,9 +1207,41 @@ fn renumber_issues_interleaved( } workflow_state.issues = interleaved; + let _ = recompute_challenge_tracking(workflow_state); Ok(()) } +fn interleave_drafts_by_author(drafts: &[DraftIssueState]) -> Vec { + let mut drafts_sorted = drafts.to_vec(); + drafts_sorted.sort_by_key(|draft| draft.draft_id); + + let mut by_author: BTreeMap> = BTreeMap::new(); + for draft in drafts_sorted { + by_author + .entry(draft.author.clone()) + .or_default() + .push_back(draft); + } + + let mut interleaved = Vec::with_capacity(drafts.len()); + loop { + let mut progressed = false; + let authors = by_author.keys().cloned().collect::>(); + for author in authors { + let queue = by_author.get_mut(&author).expect("author key must exist"); + if let Some(draft) = queue.pop_front() { + interleaved.push(draft); + progressed = true; + } + } + if !progressed { + break; + } + } + + interleaved +} + fn ensure_turn_holder( state: &SessionState, workflow_state: &PlanWorkflowState, @@ -888,6 +1272,54 @@ fn issue_dir_path(session_dir: &Path, issue_id: u32, slug: &str) -> PathBuf { .join(format!("{:02}-{slug}", issue_id)) } +fn issue_template(issue_id: u32, title: &str, question: &str, context: &str) -> String { + format!( + "# Issue {:02}: {}\n\nTitle: {}\nQuestion: {}\nContext: {}\n\n## Position A\n\n## Position B\n", + issue_id, title, title, question, context + ) +} + +fn draft_issue_template(draft_id: u32, title: &str, question: &str, context: &str) -> String { + format!( + "# Draft Issue {:02}: {}\n\nTitle: {}\nQuestion: {}\nContext: {}\n\n## Candidate Positions\n\n", + draft_id, title, title, question, context + ) +} + +fn sync_issue_challenge_tracking(workflow_state: &mut PlanWorkflowState, issue_id: u32) { + if let Some(issue) = workflow_state + .issues + .iter() + .find(|issue| issue.id == issue_id) + { + if issue.challenges.is_empty() { + workflow_state + .tracking + .issues_with_challenges + .remove(&issue_id); + } else { + workflow_state + .tracking + .issues_with_challenges + .insert(issue_id); + } + } +} + +fn recompute_challenge_tracking(workflow_state: &mut PlanWorkflowState) -> bool { + let mut next = BTreeSet::new(); + for issue in &workflow_state.issues { + if !issue.challenges.is_empty() { + next.insert(issue.id); + } + } + if next == workflow_state.tracking.issues_with_challenges { + return false; + } + workflow_state.tracking.issues_with_challenges = next; + true +} + fn append_text(path: &Path, text: &str) -> Result<()> { let existing = fs::read_to_string(path).unwrap_or_default(); fs::write(path, format!("{existing}{text}"))?; @@ -951,6 +1383,10 @@ fn slugify(input: &str) -> String { pub trait PlanPolicy { fn id(&self) -> &'static str; + fn contract(&self) -> PlanPolicyContract { + PlanPolicyContract::default() + } + fn allow_registration(&self, _state: &SessionState) -> Result<()> { Ok(()) } @@ -985,10 +1421,16 @@ impl PlanPolicy for StrictNegotiationPolicy { "strict-negotiate" } + fn contract(&self) -> PlanPolicyContract { + PlanPolicyContract::strict_negotiate() + } + fn allow_registration(&self, state: &SessionState) -> Result<()> { - if state.config.expected_agents < 3 { + let contract = self.contract(); + if state.config.expected_agents < contract.registration.min_agents { bail!( - "strict negotiate policy requires at least 3 agents (got {})", + "strict negotiate policy requires at least {} agents (got {})", + contract.registration.min_agents, state.config.expected_agents ); } @@ -1002,6 +1444,11 @@ impl PlanPolicy for StrictNegotiationPolicy { .issues .iter() .any(|issue| normalize_issue_key(&issue.title) == normalized) + || workflow_state + .tracking + .draft_issues + .iter() + .any(|draft| normalize_issue_key(&draft.title) == normalized) { bail!( "strict negotiate policy rejected duplicate issue title '{}'", @@ -1012,34 +1459,59 @@ impl PlanPolicy for StrictNegotiationPolicy { } fn allow_agree(&self, state: &SessionState, issue_id: u32, agent: &str) -> Result<()> { + let contract = self.contract(); let workflow_state = read_workflow_state(state)?; let issue = workflow_state .issues .iter() .find(|issue| issue.id == issue_id) .ok_or_else(|| anyhow!("issue {:02} not found", issue_id))?; + if !contract.agreement.require_challenge_before_agree { + return Ok(()); + } let cross_agent_challenges = issue .challenges .iter() .filter(|name| *name != agent) .count(); - if cross_agent_challenges < 2 { - bail!( - "strict negotiate policy requires at least two cross-agent challenges before agree" - ); + let required = (contract.agreement.min_cross_agent_challenges as usize) + .min(state.agents.len().saturating_sub(1)); + if cross_agent_challenges < required { + match required { + 1 => bail!( + "strict negotiate policy requires at least one cross-agent challenge before agree" + ), + 2 => bail!( + "strict negotiate policy requires at least two cross-agent challenges before agree" + ), + _ => bail!( + "strict negotiate policy requires at least {} cross-agent challenges before agree", + required + ), + } } Ok(()) } fn allow_finalize(&self, state: &SessionState, _agent: &str) -> Result<()> { + let contract = self.contract(); let workflow_state = read_workflow_state(state)?; let agreed = workflow_state .issues .iter() .filter(|issue| issue.status == IssueStatus::Agreed) .count(); - if agreed == 0 { - bail!("strict negotiate policy requires at least one AGREED issue before finalization"); + let required = contract.finalization.min_agreed_issues as usize; + if agreed < required { + if required == 1 { + bail!( + "strict negotiate policy requires at least one AGREED issue before finalization" + ); + } + bail!( + "strict negotiate policy requires at least {} AGREED issues before finalization", + required + ); } Ok(()) } @@ -1054,8 +1526,10 @@ impl PlanEngine

{ Self { policy } } - pub fn join(&self, state: &mut SessionState) -> Result<()> { + pub fn join(&self, state: &mut SessionState, agent: &str) -> Result<()> { self.policy.allow_registration(state)?; + self.enforce_registration_identity(state, agent)?; + self.enforce_registration_window(state)?; if state.phase == SessionPhase::Registration && state.agents.len() as u32 >= state.config.expected_agents { @@ -1070,6 +1544,7 @@ impl PlanEngine

{ session_dir: &Path, agent: &str, ) -> Result { + self.enforce_registration_window(state)?; refresh_negotiation_state(state, session_dir)?; enforce_turn_timeout(state)?; let _ = prepare_instruction(state, agent)?; @@ -1116,41 +1591,56 @@ impl PlanEngine

{ name: &str, args: &Value, ) -> Result { - let message = match name { - "file-issue" => { - let title = required_string(args, "title", name)?; - let title = self.policy.merge_issue_title(state, &title)?; - let question = optional_string(args, "question"); - let context = optional_string(args, "context"); - let path = file_issue( - state, - session_dir, - agent, - &title, - question.as_deref(), - context.as_deref(), - )?; - format!("filed issue at {path}") - } - "challenge" => { - let issue = required_u32(args, "issue", name)?; - challenge_issue(state, session_dir, agent, issue)? - } - "agree" => { - let issue = required_u32(args, "issue", name)?; - self.policy.allow_agree(state, issue, agent)?; - agree_issue(state, session_dir, agent, issue)? - } - _ => { - bail!( - "unknown action '{}' for plan engine policy '{}'; supported actions: file-issue, challenge, agree", - name, - self.policy.id() - ) + let message = (|| -> Result { + match name { + "file-issue" => { + let title = required_string(args, "title", name)?; + let title = self.policy.merge_issue_title(state, &title)?; + let question = optional_string(args, "question"); + let context = optional_string(args, "context"); + let contract = self.policy.contract(); + let path = file_issue( + state, + session_dir, + agent, + &title, + question.as_deref(), + context.as_deref(), + contract.draft_issue_phase.enable_private_drafts, + )?; + Ok(format!("filed issue at {path}")) + } + "challenge" => { + let issue = required_u32(args, "issue", name)?; + challenge_issue(state, session_dir, agent, issue) + } + "position" => { + let issue = required_u32(args, "issue", name)?; + position_issue(state, session_dir, agent, issue) + } + "agree" => { + let issue = required_u32(args, "issue", name)?; + self.policy.allow_agree(state, issue, agent)?; + agree_issue(state, session_dir, agent, issue) + } + _ => { + bail!( + "unknown action '{}' for plan engine policy '{}'; supported actions: file-issue, position, challenge, agree", + name, + self.policy.id() + ) + } } - }; - - Ok(message) + })(); + + message.map_err(|err| { + anyhow!( + "action '{}' failed: {}. Supported actions: file-issue, position, challenge, agree. Preconditions: {}", + name, + err, + action_preconditions(name) + ) + }) } pub fn status(&self, state: &SessionState) -> Result> { @@ -1187,9 +1677,89 @@ impl PlanEngine

{ "Issue summary: {} open, {} agreed, {} escalated", open, agreed, escalated )); + lines.push(format!( + "Finalization: writer={} reviewer={} todo_ready={} reopened_for_issues={}", + workflow_state + .finalization + .writer + .as_deref() + .unwrap_or("unassigned"), + workflow_state + .finalization + .reviewer + .as_deref() + .unwrap_or("pending"), + workflow_state.finalization.todo_ready, + workflow_state.finalization.reopened_for_issues + )); Ok(lines) } + + fn enforce_registration_identity(&self, state: &mut SessionState, agent: &str) -> Result<()> { + if state.phase != SessionPhase::Registration { + return Ok(()); + } + let contract = self.policy.contract(); + if !contract.registration.enforce_unique_identity { + return Ok(()); + } + + let mut workflow_state = read_workflow_state(state)?; + let changed = register_participant_identity(&mut workflow_state, agent)?; + if changed { + write_workflow_state(state, &workflow_state)?; + } + Ok(()) + } + + fn enforce_registration_window(&self, state: &mut SessionState) -> Result<()> { + if state.phase != SessionPhase::Registration { + return Ok(()); + } + + let contract = self.policy.contract(); + if !contract.registration.fail_closed_missing_peers { + return Ok(()); + } + + let joined = state.agents.len() as u32; + let expected = state.config.expected_agents; + if joined >= expected { + return Ok(()); + } + + let deadline = registration_deadline(state); + if now() < deadline { + return Ok(()); + } + + let mut workflow_state = read_workflow_state(state)?; + if !workflow_state.tracking.registration.fallback_enabled + && registration_fallback_requested() + { + workflow_state.tracking.registration.fallback_enabled = true; + write_workflow_state(state, &workflow_state)?; + } + if contract.registration.allow_single_agent_fallback + || workflow_state.tracking.registration.fallback_enabled + { + if joined > 0 { + state.phase = SessionPhase::Proposal; + } + return Ok(()); + } + + let missing = expected.saturating_sub(joined); + bail!( + "registration failed closed at {}: only {}/{} agents joined (missing {}). \ +recover by recreating with a longer --review-timeout-secs, or explicitly opt in fallback with RALLY_REGISTRATION_FALLBACK=1 and rerun join/next", + deadline.to_rfc3339(), + joined, + expected, + missing + ); + } } pub fn proposal_instruction( @@ -1199,7 +1769,7 @@ pub fn proposal_instruction( agent: &str, ) -> String { format!( - "Write your proposal to: {proposal_path}\nSession topic: {topic}\nAfter writing it, run: rally done --session {session} --as {agent}" + "Phase: Proposal\nSession topic: {topic}\nRequired file:\n- {proposal_path}\nNext command:\n- rally done --session {session} --as {agent}" ) } @@ -1210,7 +1780,7 @@ pub fn analysis_instruction( agent: &str, ) -> String { format!( - "Read proposal files:\n{proposals}\nWrite divergence analysis to {analysis_path}.\nFile issues with: rally plan file-issue --session {session} --as {agent} --title \"...\"\nWhen complete, run: rally done --session {session} --as {agent}" + "Phase: Analysis\nRead proposal files:\n{proposals}\nRequired file:\n- {analysis_path}\nOptional action:\n- rally plan file-issue --session {session} --as {agent} --title \"...\"\nNext command when analysis is complete:\n- rally done --session {session} --as {agent}" ) } @@ -1221,7 +1791,7 @@ pub fn finalization_write_instruction( agent: &str, ) -> String { format!( - "Write final artifacts: {coverage_path} and {final_path}.\n`final.md` must use write-todo format:\n- include `## Spec`\n- include `## Plan`\n- use numbered steps (`1.`, `2.`, ...)\n- include acceptance criteria per step\n- include a final manual QA step\nThen run: rally done --session {session} --as {agent}" + "Phase: FinalizationWrite\nRequired files:\n- {coverage_path}\n- {final_path}\n`final.md` must use write-todo format:\n- include `## Spec`\n- include `## Plan`\n- use numbered steps (`1.`, `2.`, ...)\n- include acceptance criteria per step\n- include a final manual QA step\nNext command:\n- rally done --session {session} --as {agent}" ) } @@ -1232,10 +1802,24 @@ pub fn finalization_review_instruction( agent: &str, ) -> String { format!( - "Review {coverage_path} and {final_path}. If approved, run rally done --session {session} --as {agent}. If rejected, file issues with rally plan file-issue, which returns the session to negotiation." + "Phase: FinalizationReview\nRequired files to review:\n- {coverage_path}\n- {final_path}\nIf approved, run:\n- rally done --session {session} --as {agent}\nIf changes are needed, file issue(s) and return to negotiation:\n- rally plan file-issue --session {session} --as {agent} --title \"...\"" ) } +fn action_preconditions(name: &str) -> &'static str { + match name { + "file-issue" => { + "requires analysis (or finalization review) phase; `title` is required in args-json" + } + "position" => "requires negotiation phase, your active turn, and an open issue id", + "challenge" => "requires negotiation phase, your active turn, and an open issue id", + "agree" => { + "requires negotiation phase, your active turn, an open issue id, plus cross-agent position and challenge prerequisites" + } + _ => "see supported actions list", + } +} + fn required_string(args: &Value, key: &str, action: &str) -> Result { args.get(key) .and_then(Value::as_str) @@ -1285,3 +1869,614 @@ fn normalize_issue_key(input: &str) -> String { trimmed } } + +fn register_participant_identity( + workflow_state: &mut PlanWorkflowState, + agent: &str, +) -> Result { + let canonical = normalize_agent_identity(agent); + if canonical.is_empty() { + bail!( + "registration rejected agent '{}': canonical identity was empty; choose a non-empty --as value", + agent + ); + } + + if let Some(existing) = workflow_state + .tracking + .registration + .canonical_agent_bindings + .get(&canonical) + { + if existing != agent { + bail!( + "registration rejected '{}': participant identity '{}' is already registered as '{}'. \ +recover by joining as '{}' or using a distinct identity", + agent, + canonical, + existing, + existing + ); + } + return Ok(false); + } + + workflow_state + .tracking + .registration + .canonical_agent_bindings + .insert(canonical, agent.to_string()); + Ok(true) +} + +fn normalize_agent_identity(input: &str) -> String { + normalize_issue_key(input) +} + +fn registration_deadline(state: &SessionState) -> DateTime { + state.created_at + Duration::seconds(state.config.review_timeout_secs as i64) +} + +fn registration_fallback_requested() -> bool { + std::env::var("RALLY_REGISTRATION_FALLBACK") + .ok() + .map(|value| truthy_env(&value)) + .unwrap_or(false) +} + +fn truthy_env(value: &str) -> bool { + matches!( + value.trim().to_ascii_lowercase().as_str(), + "1" | "true" | "yes" | "on" + ) +} + +#[cfg(test)] +mod tests { + use super::*; + use rally_core::{AgentState, Config, SessionType}; + use serde_json::json; + + fn test_state(phase: SessionPhase) -> SessionState { + test_state_with_expected_agents(phase, 1) + } + + fn test_state_with_expected_agents(phase: SessionPhase, expected_agents: u32) -> SessionState { + let now_ts = now(); + let mut agents = BTreeMap::new(); + agents.insert( + "agent-a".to_string(), + AgentState { + name: "agent-a".to_string(), + joined_at: now_ts, + last_seen: now_ts, + phase_status: "analysis_pending".to_string(), + rounds: 0, + }, + ); + SessionState { + name: "test-session".to_string(), + session_type: SessionType::Workflow, + phase, + created_at: now_ts, + config: Config { + expected_agents, + max_rounds: 3, + turn_timeout_secs: 300, + review_timeout_secs: 300, + }, + agents, + topic: None, + todo_path: None, + workspace: None, + workflow_id: Some("demo/negotiate".to_string()), + workflow_source: Some("buildtime".to_string()), + workflow_version: None, + workflow_state: Some(initial_workflow_state().expect("initial workflow state")), + } + } + + fn add_agent(state: &mut SessionState, name: &str) { + let now_ts = now(); + state.agents.insert( + name.to_string(), + AgentState { + name: name.to_string(), + joined_at: now_ts, + last_seen: now_ts, + phase_status: "analysis_pending".to_string(), + rounds: 0, + }, + ); + } + + fn temp_session_dir(prefix: &str) -> PathBuf { + let path = std::env::temp_dir().join(format!( + "{}-{}", + prefix, + now().timestamp_nanos_opt().unwrap_or_default() + )); + fs::create_dir_all(path.join("analysis")).expect("create analysis dir"); + fs::create_dir_all(path.join("issues")).expect("create issues dir"); + path + } + + fn open_issue(id: u32, author: &str, title: &str) -> IssueState { + IssueState { + id, + slug: slugify(title), + file: format!("{:02}-{}.md", id, slugify(title)), + title: title.to_string(), + author: author.to_string(), + status: IssueStatus::Open, + positions: BTreeSet::new(), + challenges: BTreeSet::new(), + agreed_by: None, + created_at: now(), + } + } + + fn valid_final_doc() -> &'static str { + "## Spec\n\nfinalize\n\n## Plan\n\n1. Ship\nAcceptance criteria:\n- done\n" + } + + #[test] + fn legacy_state_deserializes_with_new_tracking_defaults() -> Result<()> { + let legacy = json!({ + "state_version": 1, + "turn": null, + "issues": [], + "finalization": { + "writer": null, + "reviewer": null, + "todo_ready": false, + "reopened_for_issues": false + } + }); + let parsed = parse_workflow_state(Some(legacy))?; + assert!(parsed.tracking.analysis_completed_agents.is_empty()); + assert!(parsed.tracking.draft_issues.is_empty()); + assert!(parsed.tracking.issues_with_challenges.is_empty()); + Ok(()) + } + + #[test] + fn strict_policy_contract_covers_negotiate_constraints() { + let policy = StrictNegotiationPolicy; + let contract = policy.contract(); + assert_eq!(contract.registration.min_agents, 2); + assert!(contract.registration.enforce_unique_identity); + assert!(contract.registration.fail_closed_missing_peers); + assert!(contract.draft_issue_phase.enable_private_drafts); + assert!(contract.draft_issue_phase.merge_before_negotiation); + assert!(contract.agreement.require_challenge_before_agree); + assert!(contract.agreement.disallow_self_agreement); + assert_eq!(contract.agreement.min_cross_agent_challenges, 2); + assert!(contract.finalization.require_writer_reviewer_separation); + assert!(contract.finalization.require_coverage_audit); + assert!(contract.finalization.require_final_doc); + assert_eq!(contract.finalization.min_agreed_issues, 1); + } + + #[test] + fn strict_policy_scales_agree_threshold_for_two_agents() -> Result<()> { + let mut state = test_state_with_expected_agents(SessionPhase::Negotiation, 2); + add_agent(&mut state, "agent-b"); + let mut workflow_state = read_workflow_state(&state)?; + let mut issue = open_issue(1, "agent-a", "Threshold issue"); + issue.challenges.insert("agent-b".to_string()); + workflow_state.issues = vec![issue]; + state.workflow_state = Some(encode_workflow_state(&workflow_state)?); + + let policy = StrictNegotiationPolicy; + policy.allow_agree(&state, 1, "agent-a")?; + Ok(()) + } + + #[test] + fn strict_policy_requires_two_cross_agent_challenges_with_three_agents() -> Result<()> { + let mut state = test_state_with_expected_agents(SessionPhase::Negotiation, 3); + add_agent(&mut state, "agent-b"); + add_agent(&mut state, "agent-c"); + let mut workflow_state = read_workflow_state(&state)?; + let mut issue = open_issue(1, "agent-a", "Threshold issue"); + issue.challenges.insert("agent-b".to_string()); + workflow_state.issues = vec![issue]; + state.workflow_state = Some(encode_workflow_state(&workflow_state)?); + + let policy = StrictNegotiationPolicy; + let err = policy + .allow_agree(&state, 1, "agent-a") + .expect_err("three-agent sessions should still require two cross-agent challenges"); + assert!(err + .to_string() + .contains("at least two cross-agent challenges")); + + let mut workflow_state = read_workflow_state(&state)?; + let issue = workflow_state + .issues + .iter_mut() + .find(|issue| issue.id == 1) + .expect("issue should exist"); + issue.challenges.insert("agent-c".to_string()); + state.workflow_state = Some(encode_workflow_state(&workflow_state)?); + + policy.allow_agree(&state, 1, "agent-a")?; + Ok(()) + } + + #[test] + fn tracking_fields_capture_analysis_drafts_and_challenge_presence() -> Result<()> { + let dir = temp_session_dir("rally-plan-tracking-test"); + let mut state = test_state(SessionPhase::Analysis); + + fs::write( + dir.join("analysis").join("agent-a.md"), + "# Analysis\n\nDrafted independently.\n", + )?; + let _ = file_issue( + &mut state, + &dir, + "agent-a", + "Draft tracking issue", + None, + None, + true, + )?; + let workflow_state = read_workflow_state(&state)?; + assert_eq!(workflow_state.tracking.draft_issues.len(), 1); + assert!(workflow_state.tracking.analysis_completed_agents.is_empty()); + + let _ = mark_done(&mut state, "agent-a", &dir)?; + let workflow_state = read_workflow_state(&state)?; + assert!( + workflow_state + .tracking + .analysis_completed_agents + .contains("agent-a") + ); + + let _ = challenge_issue(&mut state, &dir, "agent-a", 1)?; + let workflow_state = read_workflow_state(&state)?; + assert!(workflow_state.tracking.issues_with_challenges.contains(&1)); + + let _ = fs::remove_dir_all(&dir); + Ok(()) + } + + #[test] + fn analysis_file_issue_keeps_issues_private_until_merge() -> Result<()> { + let dir = temp_session_dir("rally-plan-private-draft-test"); + let mut state = test_state(SessionPhase::Analysis); + let path = file_issue( + &mut state, + &dir, + "agent-a", + "Private draft", + Some("Why"), + Some("Because"), + true, + )?; + assert!( + path.contains("/analysis/drafts/agent-a/"), + "draft path should stay under agent-specific analysis drafts" + ); + let workflow_state = read_workflow_state(&state)?; + assert!(workflow_state.issues.is_empty()); + assert_eq!(workflow_state.tracking.draft_issues.len(), 1); + let _ = fs::remove_dir_all(&dir); + Ok(()) + } + + #[test] + fn analysis_file_issue_without_private_drafts_writes_issue_immediately() -> Result<()> { + let dir = temp_session_dir("rally-plan-direct-issue-test"); + let mut state = test_state(SessionPhase::Analysis); + let path = file_issue( + &mut state, + &dir, + "agent-a", + "Direct issue", + Some("Q"), + Some("C"), + false, + )?; + assert!(path.contains("/issues/")); + let workflow_state = read_workflow_state(&state)?; + assert_eq!(workflow_state.issues.len(), 1); + assert!(workflow_state.tracking.draft_issues.is_empty()); + let _ = fs::remove_dir_all(&dir); + Ok(()) + } + + #[test] + fn analysis_done_merges_drafts_interleaved_by_author() -> Result<()> { + let dir = temp_session_dir("rally-plan-merge-drafts-test"); + let mut state = test_state_with_expected_agents(SessionPhase::Analysis, 2); + add_agent(&mut state, "agent-b"); + + fs::write(dir.join("analysis").join("agent-a.md"), "# A\n")?; + fs::write(dir.join("analysis").join("agent-b.md"), "# B\n")?; + + let _ = file_issue(&mut state, &dir, "agent-a", "A1", None, None, true)?; + let _ = file_issue(&mut state, &dir, "agent-a", "A2", None, None, true)?; + let _ = file_issue(&mut state, &dir, "agent-b", "B1", None, None, true)?; + + let _ = mark_done(&mut state, "agent-a", &dir)?; + let _ = mark_done(&mut state, "agent-b", &dir)?; + + let workflow_state = read_workflow_state(&state)?; + assert_eq!(state.phase, SessionPhase::Negotiation); + assert_eq!(workflow_state.issues.len(), 3); + assert_eq!(workflow_state.issues[0].author, "agent-a"); + assert_eq!(workflow_state.issues[1].author, "agent-b"); + assert_eq!(workflow_state.issues[2].author, "agent-a"); + let _ = fs::remove_dir_all(&dir); + Ok(()) + } + + #[test] + fn position_action_records_position_without_challenge() -> Result<()> { + let dir = temp_session_dir("rally-plan-position-action-test"); + let mut state = test_state_with_expected_agents(SessionPhase::Negotiation, 2); + add_agent(&mut state, "agent-b"); + let mut workflow_state = read_workflow_state(&state)?; + workflow_state.issues = vec![open_issue(1, "agent-a", "Position action issue")]; + workflow_state.turn = Some(TurnState { + holder: "agent-a".to_string(), + started_at: now(), + round: 1, + }); + state.workflow_state = Some(encode_workflow_state(&workflow_state)?); + + let _ = position_issue(&mut state, &dir, "agent-a", 1)?; + let workflow_state = read_workflow_state(&state)?; + let issue = workflow_state + .issues + .iter() + .find(|issue| issue.id == 1) + .expect("issue should exist"); + assert!(issue.positions.contains("agent-a")); + assert!(issue.challenges.is_empty()); + let _ = fs::remove_dir_all(&dir); + Ok(()) + } + + #[test] + fn agree_requires_cross_agent_position_and_challenge() -> Result<()> { + let dir = temp_session_dir("rally-plan-agree-gates-test"); + let mut state = test_state_with_expected_agents(SessionPhase::Negotiation, 2); + add_agent(&mut state, "agent-b"); + let mut workflow_state = read_workflow_state(&state)?; + let mut issue = open_issue(1, "agent-a", "Agreement gate issue"); + issue.positions.insert("agent-a".to_string()); + issue.challenges.insert("agent-a".to_string()); + workflow_state.issues = vec![issue]; + workflow_state.turn = Some(TurnState { + holder: "agent-a".to_string(), + started_at: now(), + round: 1, + }); + state.workflow_state = Some(encode_workflow_state(&workflow_state)?); + + let err = agree_issue(&mut state, &dir, "agent-a", 1) + .expect_err("agreement should fail with only self positions"); + assert!(err.to_string().contains("only your own positions")); + + let mut workflow_state = read_workflow_state(&state)?; + let issue = workflow_state + .issues + .iter_mut() + .find(|issue| issue.id == 1) + .expect("issue should exist"); + issue.positions.insert("agent-b".to_string()); + state.workflow_state = Some(encode_workflow_state(&workflow_state)?); + + let err = agree_issue(&mut state, &dir, "agent-a", 1) + .expect_err("agreement should fail without cross-agent challenge"); + assert!(err.to_string().contains("no challenge from another agent")); + let _ = fs::remove_dir_all(&dir); + Ok(()) + } + + #[test] + fn unresolved_issues_escalate_after_round_limit() -> Result<()> { + let mut state = test_state_with_expected_agents(SessionPhase::Negotiation, 2); + add_agent(&mut state, "agent-b"); + state.config.max_rounds = 1; + let mut workflow_state = read_workflow_state(&state)?; + workflow_state.issues = vec![open_issue(1, "agent-a", "Escalation issue")]; + workflow_state.turn = Some(TurnState { + holder: "agent-a".to_string(), + started_at: now(), + round: 1, + }); + + advance_turn(&mut state, &mut workflow_state)?; + assert_eq!(state.phase, SessionPhase::Negotiation); + advance_turn(&mut state, &mut workflow_state)?; + assert_eq!(state.phase, SessionPhase::FinalizationWrite); + assert_eq!(workflow_state.issues[0].status, IssueStatus::Escalated); + Ok(()) + } + + #[test] + fn finalization_write_requires_artifacts_and_valid_format() -> Result<()> { + let dir = temp_session_dir("rally-plan-finalization-write-test"); + let mut state = test_state_with_expected_agents(SessionPhase::FinalizationWrite, 2); + add_agent(&mut state, "agent-b"); + let mut workflow_state = read_workflow_state(&state)?; + workflow_state.finalization.writer = Some("agent-a".to_string()); + state.workflow_state = Some(encode_workflow_state(&workflow_state)?); + + let err = mark_done(&mut state, "agent-a", &dir) + .expect_err("finalization write should require coverage-audit.md"); + assert!(err.to_string().contains("coverage-audit.md")); + + fs::write(dir.join("coverage-audit.md"), "coverage")?; + fs::write(dir.join("final.md"), "not a todo format")?; + let err = mark_done(&mut state, "agent-a", &dir) + .expect_err("final.md should require write-todo format"); + assert!(err.to_string().contains("write-todo format")); + + fs::write(dir.join("final.md"), valid_final_doc())?; + let message = mark_done(&mut state, "agent-a", &dir)?; + assert!(message.contains("advanced to review")); + assert_eq!(state.phase, SessionPhase::FinalizationReview); + let _ = fs::remove_dir_all(&dir); + Ok(()) + } + + #[test] + fn finalization_review_requires_distinct_reviewer_and_writes_todo() -> Result<()> { + let dir = temp_session_dir("rally-plan-finalization-review-test"); + let mut state = test_state_with_expected_agents(SessionPhase::FinalizationReview, 2); + add_agent(&mut state, "agent-b"); + let mut workflow_state = read_workflow_state(&state)?; + workflow_state.finalization.writer = Some("agent-a".to_string()); + state.workflow_state = Some(encode_workflow_state(&workflow_state)?); + fs::write(dir.join("coverage-audit.md"), "coverage")?; + fs::write(dir.join("final.md"), valid_final_doc())?; + + let err = mark_done(&mut state, "agent-a", &dir) + .expect_err("writer should not be able to self-approve in multi-agent sessions"); + assert!(err.to_string().contains("cannot self-approve")); + + let message = mark_done(&mut state, "agent-b", &dir)?; + assert!(message.contains("completed session")); + assert_eq!(state.phase, SessionPhase::Done); + assert!(dir.join("todo.md").exists()); + let todo = fs::read_to_string(dir.join("todo.md"))?; + assert_eq!(todo, valid_final_doc()); + let workflow_state = read_workflow_state(&state)?; + assert_eq!( + workflow_state.finalization.reviewer.as_deref(), + Some("agent-b") + ); + assert!(workflow_state.finalization.todo_ready); + let _ = fs::remove_dir_all(&dir); + Ok(()) + } + + #[test] + fn status_includes_turn_issue_summary_and_finalization_details() -> Result<()> { + let state = test_state_with_expected_agents(SessionPhase::Negotiation, 1); + let mut workflow_state = read_workflow_state(&state)?; + workflow_state.turn = Some(TurnState { + holder: "agent-a".to_string(), + started_at: now(), + round: 2, + }); + workflow_state.finalization.writer = Some("agent-a".to_string()); + workflow_state.finalization.reviewer = Some("agent-b".to_string()); + workflow_state.finalization.todo_ready = true; + workflow_state.issues = vec![open_issue(1, "agent-a", "Status issue")]; + let mut state = state; + state.workflow_state = Some(encode_workflow_state(&workflow_state)?); + + let lines = PlanEngine::new(DefaultPlanPolicy).status(&state)?; + assert!( + lines + .iter() + .any(|line| line.starts_with("Turn: holder=agent-a")) + ); + assert!( + lines + .iter() + .any(|line| line.contains("Issue summary: 1 open, 0 agreed, 0 escalated")) + ); + assert!( + lines + .iter() + .any(|line| line.contains("Finalization: writer=agent-a reviewer=agent-b")) + ); + Ok(()) + } + + #[test] + fn action_errors_include_supported_actions_and_preconditions() -> Result<()> { + let dir = temp_session_dir("rally-plan-action-errors-test"); + let mut state = test_state_with_expected_agents(SessionPhase::Negotiation, 2); + add_agent(&mut state, "agent-b"); + let mut workflow_state = read_workflow_state(&state)?; + workflow_state.issues = vec![open_issue(1, "agent-a", "Action error issue")]; + workflow_state.turn = Some(TurnState { + holder: "agent-a".to_string(), + started_at: now(), + round: 1, + }); + state.workflow_state = Some(encode_workflow_state(&workflow_state)?); + + let err = PlanEngine::new(DefaultPlanPolicy) + .action(&mut state, &dir, "agent-a", "agree", &json!({})) + .expect_err("missing issue id should be reported with preconditions"); + let err_msg = err.to_string(); + assert!(err_msg.contains("Supported actions: file-issue, position, challenge, agree")); + assert!(err_msg.contains("Preconditions:")); + let _ = fs::remove_dir_all(&dir); + Ok(()) + } + + #[test] + fn instructions_include_phase_required_files_and_next_command() { + let proposal = proposal_instruction("/tmp/proposal.md", "topic", "session-a", "agent-a"); + assert!(proposal.contains("Phase: Proposal")); + assert!(proposal.contains("Required file:")); + assert!(proposal.contains("Next command:")); + } + + #[test] + fn strict_registration_rejects_duplicate_canonical_identity() -> Result<()> { + let mut state = test_state_with_expected_agents(SessionPhase::Registration, 3); + let engine = PlanEngine::new(StrictNegotiationPolicy); + + engine.join(&mut state, "agent-a")?; + + let now_ts = now(); + state.agents.insert( + "agent_a".to_string(), + AgentState { + name: "agent_a".to_string(), + joined_at: now_ts, + last_seen: now_ts, + phase_status: "joined".to_string(), + rounds: 0, + }, + ); + let err = engine + .join(&mut state, "agent_a") + .expect_err("duplicate canonical identity should fail"); + assert!(err.to_string().contains("identity 'agent-a'")); + Ok(()) + } + + #[test] + fn strict_registration_fails_closed_after_deadline_without_fallback() -> Result<()> { + let mut state = test_state_with_expected_agents(SessionPhase::Registration, 3); + state.created_at = now() - Duration::seconds(600); + state.config.review_timeout_secs = 1; + let engine = PlanEngine::new(StrictNegotiationPolicy); + + let err = engine + .join(&mut state, "agent-a") + .expect_err("expired registration should fail closed"); + assert!(err.to_string().contains("registration failed closed")); + assert!(err.to_string().contains("RALLY_REGISTRATION_FALLBACK=1")); + Ok(()) + } + + #[test] + fn strict_registration_allows_explicit_fallback_flag() -> Result<()> { + let mut state = test_state_with_expected_agents(SessionPhase::Registration, 3); + state.created_at = now() - Duration::seconds(600); + state.config.review_timeout_secs = 1; + let mut workflow_state = read_workflow_state(&state)?; + workflow_state.tracking.registration.fallback_enabled = true; + state.workflow_state = Some(encode_workflow_state(&workflow_state)?); + + let engine = PlanEngine::new(StrictNegotiationPolicy); + engine.join(&mut state, "agent-a")?; + assert_eq!(state.phase, SessionPhase::Proposal); + Ok(()) + } +} diff --git a/docs/manual-qa-negotiate-workflow.md b/docs/manual-qa-negotiate-workflow.md new file mode 100644 index 0000000..6b94664 --- /dev/null +++ b/docs/manual-qa-negotiate-workflow.md @@ -0,0 +1,58 @@ +# Manual QA: Negotiate Workflow + +Run from repository root using the built binary: + +`/Users/justin/code/rally/target/debug/rally` + +## 1. Create and Join + +1. Create session: +`/Users/justin/code/rally/target/debug/rally create workflow --name qa-negotiate-final --workflow demo/negotiate --agents 3 --topic "manual qa negotiate"` + +2. Join participants: +`/Users/justin/code/rally/target/debug/rally join --session qa-negotiate-final --as agent-a` +`/Users/justin/code/rally/target/debug/rally join --session qa-negotiate-final --as agent-b` +`/Users/justin/code/rally/target/debug/rally join --session qa-negotiate-final --as agent-c` + +## 2. Validate Rule Enforcement + +1. Proposal + analysis: +- For each agent, use `rally next` and write required proposal/analysis files. +- During analysis, file at least one issue from `agent-a`. +- Optional negative checks: + - Duplicate identity: try `--as agent_a` after `agent-a` is joined (should fail). + - Duplicate draft title from another agent (should fail under strict policy). + +2. Negotiation: +- Use `position`/`challenge` actions with turn order. +- Confirm `agree` is rejected before cross-agent challenge prerequisites are met. +- Confirm non-turn-holder `position`/`challenge` fails with turn-holder precondition errors. + +3. Finalization: +- In `FinalizationWrite`, confirm missing `coverage-audit.md` or invalid `final.md` blocks completion. +- In `FinalizationReview`, confirm the writer cannot self-approve in multi-agent mode. + +## 3. Verify Output Artifacts + +1. Complete the session so it reaches `Done`. +2. Confirm `~/.rally/sessions/qa-negotiate-final/todo.md` exists and matches `final.md`. +3. Confirm `todo.md` is valid write-todo format (`## Spec` + `## Plan` with numbered steps). + +## 4. Verify Build Workflow Usability + +1. Create a build session using the produced `todo.md`: +`/Users/justin/code/rally/target/debug/rally create implement --name qa-negotiate-build --todo ~/.rally/sessions/qa-negotiate-final/todo.md --workspace /Users/justin/code/rally --reviewers 1` + +2. Join implementer/reviewer and run `rally next` once per role to confirm the todo parses and step instructions render. + +## 5. Verify Prompt/Behavior Reuse + +1. Compare `rally next` wording for proposal/analysis/finalization between: +- `builtin/plan` sessions (`rally create plan ...`) +- `demo/negotiate` sessions (`rally create workflow --workflow demo/negotiate ...`) + +2. Confirm shared instruction structure is preserved while strict negotiate rules are enforced. + +## 6. Cleanup + +`rm -rf ~/.rally/sessions/qa-negotiate-final ~/.rally/sessions/qa-negotiate-build` diff --git a/src/cli.rs b/src/cli.rs index ca78c20..f558e7f 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -104,7 +104,10 @@ pub struct CreateArgs { pub todo: Option, #[arg(long)] pub workspace: Option, - #[arg(long, help = "Workflow id for `create workflow` (for example myco/security-review)")] + #[arg( + long, + help = "Workflow id for `create workflow` (for example myco/security-review)" + )] pub workflow: Option, #[arg( long, @@ -364,7 +367,12 @@ pub struct CommandRunArgs { #[derive(Args, Debug)] pub struct CommandDoctorArgs { - #[arg(long, value_enum, default_value = "all", help = "Target harness to inspect")] + #[arg( + long, + value_enum, + default_value = "all", + help = "Target harness to inspect" + )] pub target: CommandTargetArg, #[arg(long, help = "Print checks without changing filesystem")] pub dry_run: bool, diff --git a/src/command_surface.rs b/src/command_surface.rs index 8c3eee0..615dbef 100644 --- a/src/command_surface.rs +++ b/src/command_surface.rs @@ -11,8 +11,8 @@ use serde::{Deserialize, Serialize}; use crate::{ WorkflowRegistry, cli::{ - CommandDoctorArgs, CommandInstallArgs, CommandRunArgs, CommandTargetArg, JoinArgs, - CommandUninstallArgs, + CommandDoctorArgs, CommandInstallArgs, CommandRunArgs, CommandTargetArg, + CommandUninstallArgs, JoinArgs, }, commands, state::SessionHandle, @@ -238,7 +238,8 @@ struct AdapterEnvironment { impl AdapterEnvironment { fn detect() -> Result { - let home_dir = dirs::home_dir().ok_or_else(|| anyhow!("unable to resolve home directory"))?; + let home_dir = + dirs::home_dir().ok_or_else(|| anyhow!("unable to resolve home directory"))?; Ok(Self { home_dir }) } @@ -556,7 +557,8 @@ fn classify_install_action( return Ok(InstallAction::Create); } - let current = fs::read(path).with_context(|| format!("reading existing wrapper {}", path.display()))?; + let current = + fs::read(path).with_context(|| format!("reading existing wrapper {}", path.display()))?; if current == content.as_bytes() { return Ok(InstallAction::Noop); } @@ -661,7 +663,9 @@ fn doctor_with_environment( DoctorStatus::InstalledManagedModified } } - ManagedOwnership::ManagedForOtherTarget(_) => DoctorStatus::InstalledManagedWrongTarget, + ManagedOwnership::ManagedForOtherTarget(_) => { + DoctorStatus::InstalledManagedWrongTarget + } ManagedOwnership::NotManaged => DoctorStatus::InstalledUnmanaged, } }; @@ -761,25 +765,40 @@ fn resolve_run_context( prompter: &mut dyn Prompter, ) -> Result { let mut session = args.session.clone(); - let mut agent = args.agent.clone().map(|value| normalize_agent_selector(&value)); - let workflow = workflow_selector.clone().or_else(|| saved.and_then(|ctx| ctx.workflow.clone())); - - if session.is_none() && let Some(invite_ctx) = &invite { + let mut agent = args + .agent + .clone() + .map(|value| normalize_agent_selector(&value)); + let workflow = workflow_selector + .clone() + .or_else(|| saved.and_then(|ctx| ctx.workflow.clone())); + + if session.is_none() + && let Some(invite_ctx) = &invite + { session = Some(invite_ctx.session.clone()); } - if agent.is_none() && let Some(selector) = role_selector { + if agent.is_none() + && let Some(selector) = role_selector + { agent = Some(normalize_agent_selector(&selector)); } - if agent.is_none() && let Some(invite_ctx) = &invite { + if agent.is_none() + && let Some(invite_ctx) = &invite + { agent = Some(invite_ctx.agent.clone()); } - if session.is_none() && let Some(saved_ctx) = saved { + if session.is_none() + && let Some(saved_ctx) = saved + { session = Some(saved_ctx.session.clone()); } - if agent.is_none() && let Some(saved_ctx) = saved { + if agent.is_none() + && let Some(saved_ctx) = saved + { agent = Some(saved_ctx.agent.clone()); } @@ -941,8 +960,7 @@ mod tests { fn discover_install_path_uses_default_dot_directory_layout() -> Result<()> { let home = unique_dir("rally-command-adapter-layout"); let root = home.join(".droid"); - fs::create_dir_all(&root) - .with_context(|| format!("creating {}", root.display()))?; + fs::create_dir_all(&root).with_context(|| format!("creating {}", root.display()))?; let adapter = DotDirAdapter::new(HarnessTarget::Droid); let path = adapter.discover_install_path(&env_with_home(&home))?; assert_eq!(path, root.join("commands").join("rally.md")); @@ -1136,7 +1154,8 @@ mod tests { .contains("refusing to overwrite non-Rally file") ); - let uninstall = uninstall_with_environment(&uninstall_args(CommandTargetArg::Pi, false), &env)?; + let uninstall = + uninstall_with_environment(&uninstall_args(CommandTargetArg::Pi, false), &env)?; assert_eq!(uninstall[0].action, UninstallAction::SkippedUnmanaged); assert!(wrapper.exists()); Ok(()) @@ -1202,7 +1221,8 @@ mod tests { fs::create_dir_all(unmanaged.parent().expect("wrapper parent"))?; fs::write(&unmanaged, "custom content")?; - let outcome = uninstall_with_environment(&uninstall_args(CommandTargetArg::All, false), &env)?; + let outcome = + uninstall_with_environment(&uninstall_args(CommandTargetArg::All, false), &env)?; let codex = outcome .iter() .find(|o| o.target == HarnessTarget::Codex) @@ -1230,7 +1250,8 @@ mod tests { &render_ctx, )?; let wrapper = root.join("commands").join("rally.md"); - let outcome = uninstall_with_environment(&uninstall_args(CommandTargetArg::Pi, true), &env)?; + let outcome = + uninstall_with_environment(&uninstall_args(CommandTargetArg::Pi, true), &env)?; assert_eq!(outcome[0].action, UninstallAction::WouldRemoveManaged); assert!(wrapper.exists()); Ok(()) @@ -1283,14 +1304,8 @@ mod tests { agent: "reviewer-1".to_string(), }; let mut prompter = StaticPrompter::new(&[]); - let resolved = resolve_run_context( - &args, - None, - None, - Some(invite), - Some(&saved), - &mut prompter, - )?; + let resolved = + resolve_run_context(&args, None, None, Some(invite), Some(&saved), &mut prompter)?; assert_eq!(resolved.session, "token-session"); assert_eq!(resolved.agent, "reviewer-1"); Ok(()) @@ -1308,8 +1323,7 @@ mod tests { workflow: Some("build".to_string()), }; let mut prompter = StaticPrompter::new(&[]); - let resolved = - resolve_run_context(&args, None, None, None, Some(&saved), &mut prompter)?; + let resolved = resolve_run_context(&args, None, None, None, Some(&saved), &mut prompter)?; assert_eq!(resolved.session, "saved-session"); assert_eq!(resolved.agent, "saved-agent"); assert_eq!(resolved.workflow.as_deref(), Some("build")); diff --git a/src/workflow/builtin.rs b/src/workflow/builtin.rs index 6cd9548..a8cb121 100644 --- a/src/workflow/builtin.rs +++ b/src/workflow/builtin.rs @@ -34,12 +34,12 @@ impl Workflow for PlanWorkflow { } fn supported_actions(&self) -> &'static [&'static str] { - &["file-issue", "challenge", "agree"] + &["file-issue", "position", "challenge", "agree"] } fn on_join(&self, ctx: &mut JoinContext<'_>) -> Result { let engine = PlanEngine::new(DefaultPlanPolicy); - engine.join(ctx.host.state)?; + engine.join(ctx.host.state, ctx.agent)?; Ok(JoinDispatch::default()) } @@ -91,7 +91,7 @@ impl Workflow for ComposedNegotiateWorkflow { } fn supported_actions(&self) -> &'static [&'static str] { - &["file-issue", "challenge", "agree"] + &["file-issue", "position", "challenge", "agree"] } fn on_create(&self, ctx: &mut super::CreateContext<'_>) -> Result { @@ -111,7 +111,7 @@ impl Workflow for ComposedNegotiateWorkflow { fn on_join(&self, ctx: &mut JoinContext<'_>) -> Result { let engine = PlanEngine::new(StrictNegotiationPolicy); - engine.join(ctx.host.state)?; + engine.join(ctx.host.state, ctx.agent)?; Ok(JoinDispatch::default()) } diff --git a/src/workflow/interop.rs b/src/workflow/interop.rs index ef5743b..758874e 100644 --- a/src/workflow/interop.rs +++ b/src/workflow/interop.rs @@ -26,23 +26,29 @@ pub fn delegate_join( target: DelegationTarget<'_>, agent: &str, ) -> Result { - with_child_session(parent, session_dir, registry, target, |session_dir, workflow, child| { - let mut ctx = JoinContext { - host: HostContextMut { - session_dir, - state: child, - }, - agent, - }; - match dispatch_join(workflow.as_ref(), &mut ctx)? { - WorkflowDispatch::Join(dispatch) => Ok(dispatch), - other => bail!( - "workflow '{}' returned unexpected delegation dispatch for join: {:?}", - workflow.id(), - other - ), - } - }) + with_child_session( + parent, + session_dir, + registry, + target, + |session_dir, workflow, child| { + let mut ctx = JoinContext { + host: HostContextMut { + session_dir, + state: child, + }, + agent, + }; + match dispatch_join(workflow.as_ref(), &mut ctx)? { + WorkflowDispatch::Join(dispatch) => Ok(dispatch), + other => bail!( + "workflow '{}' returned unexpected delegation dispatch for join: {:?}", + workflow.id(), + other + ), + } + }, + ) } pub fn delegate_next_poll( @@ -52,23 +58,29 @@ pub fn delegate_next_poll( target: DelegationTarget<'_>, agent: &str, ) -> Result { - with_child_session(parent, session_dir, registry, target, |session_dir, workflow, child| { - let mut ctx = NextPollContext { - host: HostContextMut { - session_dir, - state: child, - }, - agent, - }; - match dispatch_next_poll(workflow.as_ref(), &mut ctx)? { - WorkflowDispatch::NextPoll(dispatch) => Ok(dispatch), - other => bail!( - "workflow '{}' returned unexpected delegation dispatch for next poll: {:?}", - workflow.id(), - other - ), - } - }) + with_child_session( + parent, + session_dir, + registry, + target, + |session_dir, workflow, child| { + let mut ctx = NextPollContext { + host: HostContextMut { + session_dir, + state: child, + }, + agent, + }; + match dispatch_next_poll(workflow.as_ref(), &mut ctx)? { + WorkflowDispatch::NextPoll(dispatch) => Ok(dispatch), + other => bail!( + "workflow '{}' returned unexpected delegation dispatch for next poll: {:?}", + workflow.id(), + other + ), + } + }, + ) } pub fn delegate_done( @@ -78,23 +90,29 @@ pub fn delegate_done( target: DelegationTarget<'_>, agent: &str, ) -> Result { - with_child_session(parent, session_dir, registry, target, |session_dir, workflow, child| { - let mut ctx = DoneContext { - host: HostContextMut { - session_dir, - state: child, - }, - agent, - }; - match dispatch_done(workflow.as_ref(), &mut ctx)? { - WorkflowDispatch::Done(dispatch) => Ok(dispatch), - other => bail!( - "workflow '{}' returned unexpected delegation dispatch for done: {:?}", - workflow.id(), - other - ), - } - }) + with_child_session( + parent, + session_dir, + registry, + target, + |session_dir, workflow, child| { + let mut ctx = DoneContext { + host: HostContextMut { + session_dir, + state: child, + }, + agent, + }; + match dispatch_done(workflow.as_ref(), &mut ctx)? { + WorkflowDispatch::Done(dispatch) => Ok(dispatch), + other => bail!( + "workflow '{}' returned unexpected delegation dispatch for done: {:?}", + workflow.id(), + other + ), + } + }, + ) } pub fn delegate_action( @@ -106,26 +124,32 @@ pub fn delegate_action( name: &str, args: &Value, ) -> Result { - with_child_session(parent, session_dir, registry, target, |session_dir, workflow, child| { - let mut ctx = ActionContext { - host: HostContextMut { - session_dir, - state: child, - }, - agent, - name, - args, - }; - match dispatch_action(workflow.as_ref(), &mut ctx)? { - WorkflowDispatch::Action(dispatch) => Ok(dispatch), - other => bail!( - "workflow '{}' returned unexpected delegation dispatch for action '{}': {:?}", - workflow.id(), + with_child_session( + parent, + session_dir, + registry, + target, + |session_dir, workflow, child| { + let mut ctx = ActionContext { + host: HostContextMut { + session_dir, + state: child, + }, + agent, name, - other - ), - } - }) + args, + }; + match dispatch_action(workflow.as_ref(), &mut ctx)? { + WorkflowDispatch::Action(dispatch) => Ok(dispatch), + other => bail!( + "workflow '{}' returned unexpected delegation dispatch for action '{}': {:?}", + workflow.id(), + name, + other + ), + } + }, + ) } pub fn delegate_status( @@ -177,7 +201,10 @@ where Ok(result) } -fn projected_child_session(parent: &SessionState, target: DelegationTarget<'_>) -> Result { +fn projected_child_session( + parent: &SessionState, + target: DelegationTarget<'_>, +) -> Result { validate_namespace(target.namespace)?; let mut child = parent.clone(); child.workflow_id = Some(target.workflow_id.to_string()); diff --git a/tests/command_install_run.rs b/tests/command_install_run.rs index 1b502ad..2ef43df 100644 --- a/tests/command_install_run.rs +++ b/tests/command_install_run.rs @@ -99,7 +99,13 @@ fn command_install_run_entrypoint_integration() -> Result<()> { let pi_root = temp_home.join(".pi"); let claude_root = temp_home.join(".claude"); let factory_root = temp_home.join(".factory"); - for root in [&codex_root, &droid_root, &pi_root, &claude_root, &factory_root] { + for root in [ + &codex_root, + &droid_root, + &pi_root, + &claude_root, + &factory_root, + ] { fs::create_dir_all(root)?; } @@ -137,7 +143,10 @@ fn command_install_run_entrypoint_integration() -> Result<()> { ®istry, ) .expect_err("unmanaged wrapper should require --force"); - assert!(err.to_string().contains("refusing to overwrite non-Rally file")); + assert!( + err.to_string() + .contains("refusing to overwrite non-Rally file") + ); run_cli( &["rally", "command", "doctor", "--target", "all", "--dry-run"], @@ -162,7 +171,10 @@ fn command_install_run_entrypoint_integration() -> Result<()> { ®istry, )?; assert!(!codex_wrapper.exists()); - assert_eq!(fs::read_to_string(&droid_wrapper)?, "custom wrapper content"); + assert_eq!( + fs::read_to_string(&droid_wrapper)?, + "custom wrapper content" + ); let todo_one = temp_home.join("todo-one.md"); let todo_two = temp_home.join("todo-two.md"); @@ -242,10 +254,7 @@ fn command_install_run_entrypoint_integration() -> Result<()> { let session_one_seen_before = read_agent_last_seen(&temp_home, &session_one, "implementer")?; assert_eq!( - run_cli( - &["rally", "command", "run", "--non-interactive"], - ®istry, - )?, + run_cli(&["rally", "command", "run", "--non-interactive"], ®istry,)?, 0 ); let session_one_seen_after = read_agent_last_seen(&temp_home, &session_one, "implementer")?; @@ -253,38 +262,44 @@ fn command_install_run_entrypoint_integration() -> Result<()> { assert_eq!( run_cli( - &[ - "rally", - "command", - "run", - &format!("rly:{session_two}:implementer"), - "--non-interactive", - ], - ®istry, - )?, + &[ + "rally", + "command", + "run", + &format!("rly:{session_two}:implementer"), + "--non-interactive", + ], + ®istry, + )?, 0 ); - assert_eq!(read_context_session(&temp_home, &manifest_dir)?, session_two); + assert_eq!( + read_context_session(&temp_home, &manifest_dir)?, + session_two + ); assert_eq!( run_cli( - &[ - "rally", - "command", - "run", - "--session", - &session_one, - "--as", - "implementer", - "--invite", - &format!("rly:{session_two}:implementer"), - "--non-interactive", - ], - ®istry, - )?, + &[ + "rally", + "command", + "run", + "--session", + &session_one, + "--as", + "implementer", + "--invite", + &format!("rly:{session_two}:implementer"), + "--non-interactive", + ], + ®istry, + )?, 0 ); - assert_eq!(read_context_session(&temp_home, &manifest_dir)?, session_one); + assert_eq!( + read_context_session(&temp_home, &manifest_dir)?, + session_one + ); fs::write(&context_store, "{not-json}")?; assert_eq!( @@ -327,7 +342,10 @@ fn command_install_run_entrypoint_integration() -> Result<()> { String::from_utf8_lossy(&output.stdout), String::from_utf8_lossy(&output.stderr) ); - assert_eq!(read_context_session(&temp_home, &manifest_dir)?, session_two); + assert_eq!( + read_context_session(&temp_home, &manifest_dir)?, + session_two + ); { let handle = SessionHandle::open(&session_two)?; diff --git a/tests/extensibility_mvp.rs b/tests/extensibility_mvp.rs index 5443868..9029c02 100644 --- a/tests/extensibility_mvp.rs +++ b/tests/extensibility_mvp.rs @@ -2,6 +2,7 @@ use std::{ fs, path::{Path, PathBuf}, sync::atomic::{AtomicUsize, Ordering}, + thread, time::{SystemTime, UNIX_EPOCH}, }; @@ -462,6 +463,20 @@ fn composed_negotiate_reuses_templates_and_enforces_strict_policy() -> Result<() let mut state = handle.load_state()?; state.phase = SessionPhase::Negotiation; let mut plan_state = rally_workflow_plan::read_workflow_state(&state)?; + if plan_state.issues.is_empty() { + plan_state.issues.push(rally_workflow_plan::IssueState { + id: 1, + slug: "issue-merge-rule".to_string(), + file: "01-issue-merge-rule.md".to_string(), + title: "Issue Merge Rule".to_string(), + author: "agent-a".to_string(), + status: rally_workflow_plan::IssueStatus::Open, + positions: Default::default(), + challenges: Default::default(), + agreed_by: None, + created_at: state::now(), + }); + } let issue = plan_state .issues .iter_mut() @@ -527,6 +542,505 @@ fn composed_negotiate_reuses_templates_and_enforces_strict_policy() -> Result<() Ok(()) } +#[test] +fn negotiate_happy_path_runs_analysis_positions_and_finalization() -> Result<()> { + let registry = builtin_registry()?; + let session = unique_name("rally-negotiate-happy-path-it"); + let session_dir = state::session_dir(&session); + let _ = fs::remove_dir_all(&session_dir); + + run_cli( + &[ + "rally", + "create", + "workflow", + "--name", + &session, + "--workflow", + "demo/negotiate", + "--agents", + "3", + "--topic", + "happy path", + ], + ®istry, + )?; + for agent in ["agent-a", "agent-b", "agent-c"] { + commands::join_with_registry( + &JoinArgs { + session: session.clone(), + agent: agent.to_string(), + timeout: None, + }, + ®istry, + )?; + } + + for agent in ["agent-a", "agent-b", "agent-c"] { + write_file( + &session_dir + .join("sources") + .join(format!("{agent}-proposal.md")), + &format!("# {agent} proposal\n"), + ); + run_cli( + &["rally", "done", "--session", &session, "--as", agent], + ®istry, + )?; + } + + write_file( + &session_dir.join("analysis").join("agent-a.md"), + "# analysis a\n", + ); + run_cli( + &[ + "rally", + "workflow", + "action", + "--session", + &session, + "--as", + "agent-a", + "--name", + "file-issue", + "--args-json", + r#"{"title":"Happy path issue"}"#, + ], + ®istry, + )?; + for agent in ["agent-a", "agent-b", "agent-c"] { + write_file( + &session_dir.join("analysis").join(format!("{agent}.md")), + &format!("# {agent} analysis\n"), + ); + run_cli( + &["rally", "done", "--session", &session, "--as", agent], + ®istry, + )?; + } + + run_cli( + &[ + "rally", + "workflow", + "action", + "--session", + &session, + "--as", + "agent-a", + "--name", + "position", + "--args-json", + r#"{"issue":1}"#, + ], + ®istry, + )?; + run_cli( + &["rally", "done", "--session", &session, "--as", "agent-a"], + ®istry, + )?; + run_cli( + &[ + "rally", + "workflow", + "action", + "--session", + &session, + "--as", + "agent-b", + "--name", + "challenge", + "--args-json", + r#"{"issue":1}"#, + ], + ®istry, + )?; + run_cli( + &["rally", "done", "--session", &session, "--as", "agent-b"], + ®istry, + )?; + run_cli( + &[ + "rally", + "workflow", + "action", + "--session", + &session, + "--as", + "agent-c", + "--name", + "challenge", + "--args-json", + r#"{"issue":1}"#, + ], + ®istry, + )?; + run_cli( + &["rally", "done", "--session", &session, "--as", "agent-c"], + ®istry, + )?; + run_cli( + &[ + "rally", + "workflow", + "action", + "--session", + &session, + "--as", + "agent-a", + "--name", + "agree", + "--args-json", + r#"{"issue":1}"#, + ], + ®istry, + )?; + + assert_eq!( + commands::next_with_registry( + &NextArgs { + session: session.clone(), + agent: "agent-a".to_string(), + timeout: Some(2), + }, + ®istry + )?, + 0 + ); + write_file( + &session_dir.join("coverage-audit.md"), + "# coverage\n- issue 1\n", + ); + write_file(&session_dir.join("final.md"), plan_final_doc()); + run_cli( + &["rally", "done", "--session", &session, "--as", "agent-a"], + ®istry, + )?; + run_cli( + &["rally", "done", "--session", &session, "--as", "agent-b"], + ®istry, + )?; + + let handle = SessionHandle::open(&session)?; + let state = handle.load_state()?; + assert_eq!(state.phase, SessionPhase::Done); + assert!(session_dir.join("todo.md").exists()); + assert_eq!( + fs::read_to_string(session_dir.join("todo.md"))?, + fs::read_to_string(session_dir.join("final.md"))? + ); + drop(handle); + let _ = fs::remove_dir_all(&session_dir); + Ok(()) +} + +#[test] +fn negotiate_failure_cases_cover_identity_peer_timeout_agree_and_approver() -> Result<()> { + let registry = builtin_registry()?; + + let dup_session = unique_name("rally-negotiate-dup-id-it"); + let dup_dir = state::session_dir(&dup_session); + let _ = fs::remove_dir_all(&dup_dir); + run_cli( + &[ + "rally", + "create", + "workflow", + "--name", + &dup_session, + "--workflow", + "demo/negotiate", + "--agents", + "3", + ], + ®istry, + )?; + commands::join_with_registry( + &JoinArgs { + session: dup_session.clone(), + agent: "agent-a".to_string(), + timeout: None, + }, + ®istry, + )?; + let err = commands::join_with_registry( + &JoinArgs { + session: dup_session.clone(), + agent: "agent_a".to_string(), + timeout: None, + }, + ®istry, + ) + .expect_err("duplicate canonical identity should be rejected"); + assert!(err.to_string().contains("identity 'agent-a'")); + let _ = fs::remove_dir_all(&dup_dir); + + let timeout_session = unique_name("rally-negotiate-timeout-it"); + let timeout_dir = state::session_dir(&timeout_session); + let _ = fs::remove_dir_all(&timeout_dir); + run_cli( + &[ + "rally", + "create", + "workflow", + "--name", + &timeout_session, + "--workflow", + "demo/negotiate", + "--agents", + "3", + "--review-timeout-secs", + "1", + ], + ®istry, + )?; + let handle = SessionHandle::open(&timeout_session)?; + let mut state = handle.load_state()?; + state.created_at = state::now() - chrono::Duration::seconds(600); + handle.save_state(&state)?; + drop(handle); + let err = commands::join_with_registry( + &JoinArgs { + session: timeout_session.clone(), + agent: "agent-a".to_string(), + timeout: None, + }, + ®istry, + ) + .expect_err("missing peers should fail closed after timeout"); + assert!(err.to_string().contains("registration failed closed")); + let _ = fs::remove_dir_all(&timeout_dir); + + let fail_session = unique_name("rally-negotiate-failures-it"); + let fail_dir = state::session_dir(&fail_session); + let _ = fs::remove_dir_all(&fail_dir); + run_cli( + &[ + "rally", + "create", + "workflow", + "--name", + &fail_session, + "--workflow", + "demo/negotiate", + "--agents", + "3", + ], + ®istry, + )?; + for agent in ["agent-a", "agent-b", "agent-c"] { + commands::join_with_registry( + &JoinArgs { + session: fail_session.clone(), + agent: agent.to_string(), + timeout: None, + }, + ®istry, + )?; + } + let handle = SessionHandle::open(&fail_session)?; + let mut state = handle.load_state()?; + state.phase = SessionPhase::Negotiation; + state.workflow_state = Some(json!({ + "state_version": 2, + "turn": { + "holder": "agent-a", + "started_at": state::now().to_rfc3339(), + "round": 1 + }, + "issues": [{ + "id": 1, + "slug": "failure-issue", + "file": "01-failure-issue.md", + "title": "Failure issue", + "author": "agent-a", + "status": "OPEN", + "positions": ["agent-a"], + "challenges": ["agent-a"], + "agreed_by": null, + "created_at": state::now().to_rfc3339() + }], + "finalization": { + "writer": "agent-a", + "reviewer": null, + "todo_ready": false, + "reopened_for_issues": false + }, + "tracking": {} + })); + handle.save_state(&state)?; + drop(handle); + + let err = run_cli( + &[ + "rally", + "workflow", + "action", + "--session", + &fail_session, + "--as", + "agent-a", + "--name", + "agree", + "--args-json", + r#"{"issue":1}"#, + ], + ®istry, + ) + .expect_err("premature agree should fail"); + let err_text = err.to_string(); + assert!( + err_text.contains("cross-agent challenges") || err_text.contains("only your own positions") + ); + + let handle = SessionHandle::open(&fail_session)?; + let mut state = handle.load_state()?; + state.phase = SessionPhase::FinalizationReview; + state.workflow_state = Some(json!({ + "state_version": 2, + "turn": null, + "issues": [{ + "id": 1, + "slug": "failure-issue", + "file": "01-failure-issue.md", + "title": "Failure issue", + "author": "agent-a", + "status": "AGREED", + "positions": ["agent-a", "agent-b"], + "challenges": ["agent-b", "agent-c"], + "agreed_by": "agent-a", + "created_at": state::now().to_rfc3339() + }], + "finalization": { + "writer": "agent-a", + "reviewer": null, + "todo_ready": false, + "reopened_for_issues": false + }, + "tracking": {} + })); + handle.save_state(&state)?; + drop(handle); + write_file(&fail_dir.join("coverage-audit.md"), "# coverage\n"); + write_file(&fail_dir.join("final.md"), plan_final_doc()); + let err = run_cli( + &[ + "rally", + "done", + "--session", + &fail_session, + "--as", + "agent-a", + ], + ®istry, + ) + .expect_err("writer self-approval should be rejected"); + assert!(err.to_string().contains("cannot self-approve")); + let _ = fs::remove_dir_all(&fail_dir); + Ok(()) +} + +#[test] +fn invalid_concurrent_actions_do_not_corrupt_negotiate_state() -> Result<()> { + let registry = builtin_registry()?; + let session = unique_name("rally-negotiate-concurrency-it"); + let session_dir = state::session_dir(&session); + let _ = fs::remove_dir_all(&session_dir); + run_cli( + &[ + "rally", + "create", + "workflow", + "--name", + &session, + "--workflow", + "demo/negotiate", + "--agents", + "3", + ], + ®istry, + )?; + for agent in ["agent-a", "agent-b", "agent-c"] { + commands::join_with_registry( + &JoinArgs { + session: session.clone(), + agent: agent.to_string(), + timeout: None, + }, + ®istry, + )?; + } + let handle = SessionHandle::open(&session)?; + let mut state = handle.load_state()?; + state.phase = SessionPhase::Negotiation; + state.workflow_state = Some(json!({ + "state_version": 2, + "turn": { + "holder": "agent-a", + "started_at": state::now().to_rfc3339(), + "round": 1 + }, + "issues": [{ + "id": 1, + "slug": "concurrency-issue", + "file": "01-concurrency-issue.md", + "title": "Concurrency issue", + "author": "agent-a", + "status": "OPEN", + "positions": [], + "challenges": [], + "agreed_by": null, + "created_at": state::now().to_rfc3339() + }], + "finalization": { + "writer": null, + "reviewer": null, + "todo_ready": false, + "reopened_for_issues": false + }, + "tracking": {} + })); + handle.save_state(&state)?; + drop(handle); + + let state_path = session_dir.join("state.json"); + let before = fs::read_to_string(&state_path)?; + thread::scope(|scope| { + for agent in ["agent-b", "agent-c"] { + let session = session.clone(); + scope.spawn(move || { + let registry = builtin_registry().expect("registry"); + let err = run_cli( + &[ + "rally", + "workflow", + "action", + "--session", + &session, + "--as", + agent, + "--name", + "position", + "--args-json", + r#"{"issue":1}"#, + ], + ®istry, + ) + .expect_err("non-turn-holder action should fail"); + assert!(err.to_string().contains("currently 'agent-a' turn")); + }); + } + }); + let after = fs::read_to_string(&state_path)?; + assert_eq!( + before, after, + "invalid concurrent actions must not mutate state" + ); + let _ = fs::remove_dir_all(&session_dir); + Ok(()) +} + #[test] fn create_workflow_rejects_builtin_workflow_ids() -> Result<()> { let registry = builtin_registry()?;