diff --git a/ai.go b/ai.go index 8b81971a..f6620398 100644 --- a/ai.go +++ b/ai.go @@ -7636,6 +7636,11 @@ func HandleAiAgentExecutionStart(execution WorkflowExecution, startNode Action, decidedApps += lowername + ", " } + + // Let's inject http. + if !strings.Contains(decidedApps, "http") { + decidedApps += "http, " + } } if len(decidedApps) > 0 { @@ -8674,7 +8679,7 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( //log.Printf("[INFO] AI_AGENT_FINISH: execution_id=%s status=%s duration=%ds decisions=%d", execution.ExecutionId, agentOutput.Status, time.Now().Unix()-agentOutput.StartedAt, len(agentOutput.Decisions)) - if agentOutput.Status == "FINISHED" && agentOutput.CompletedAt > 0 && execution.Status == "EXECUTING" { + if agentOutput.Status == "FINISHED" && agentOutput.CompletedAt > 0 && execution.Status != "ABORTED" && execution.Status != "FAILURE" { duration := agentOutput.CompletedAt - agentOutput.StartedAt log.Printf("[INFO][%s] AI_AGENT_COMPLETE: org=%s duration=%ds decisions=%d llm_calls=%d total_tokens=%d status=SUCCESS", execution.ExecutionId, execution.Workflow.OrgId, duration, len(agentOutput.Decisions), agentOutput.LLMCallCount, agentOutput.TotalTokens) for resultIndex, result := range execution.Results { @@ -8726,6 +8731,10 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( } } } + + if createNextActions { + return startNode, nil + } // 1. Map the response back newResult, err := json.Marshal(resultMapping) diff --git a/db-connector.go b/db-connector.go index 6fa106d9..e7e54a4e 100755 --- a/db-connector.go +++ b/db-connector.go @@ -2017,7 +2017,7 @@ func Fixexecution(ctx context.Context, workflowExecution WorkflowExecution) (Wor finishedDecisions = append(finishedDecisions, decision.RunDetails.Id) continue } else if decision.RunDetails.Status == "FAILURE" { - //finishedDecisions = append(finishedDecisions, decision.RunDetails.Id) + finishedDecisions = append(finishedDecisions, decision.RunDetails.Id) failedFound = true continue } else if decision.RunDetails.Status == "RUNNING" && decision.Action != "ask" { @@ -2030,8 +2030,11 @@ func Fixexecution(ctx context.Context, workflowExecution WorkflowExecution) (Wor mappedOutput.Decisions[decisionIndex].RunDetails.Status = "FAILURE" mappedOutput.Decisions[decisionIndex].RunDetails.CompletedAt = time.Now().Unix() mappedOutput.Decisions[decisionIndex].RunDetails.RawResponse += "\n[ERROR] Decision marked as FAILURE due to 5 minute timeout." - } + // Count this as finished + failed so recovery triggers in the same Fixexecution run + finishedDecisions = append(finishedDecisions, decision.RunDetails.Id) + failedFound = true + } } else { if decision.RunDetails.CompletedAt > 0 { if debug { @@ -2133,7 +2136,7 @@ func Fixexecution(ctx context.Context, workflowExecution WorkflowExecution) (Wor go sendAgentActionSelfRequest("SUCCESS", workflowExecution, workflowExecution.Results[resultIndex]) }() } else { - log.Printf("[INFO][%s] All decisions finished for agent action %s - but no finish action found, marking as WAITING.", workflowExecution.ExecutionId, action.ID) + log.Printf("[INFO][%s] All decisions finished for agent action %s - but no finish action found. Re-invoking agent to finalize (failedFound: %t).", workflowExecution.ExecutionId, action.ID, failedFound) mappedOutput.Status = "RUNNING" mappedOutput.CompletedAt = 0 @@ -2143,10 +2146,16 @@ func Fixexecution(ctx context.Context, workflowExecution WorkflowExecution) (Wor workflowExecution.Status = "EXECUTING" } - // To ensure the execution is actually updated + // Re-invoke the agent so the LLM can see the failure and produce a proper "finish" decision. + + capturedExec := workflowExecution + capturedAction := action go func() { - time.Sleep(1 * time.Second) - sendAgentActionSelfRequest("WAITING", workflowExecution, workflowExecution.Results[resultIndex]) + time.Sleep(2 * time.Second) + _, err := HandleAiAgentExecutionStart(capturedExec, capturedAction, true) + if err != nil { + log.Printf("[ERROR][%s] Failed re-invoking agent after decisions completed for action %s: %s", capturedExec.ExecutionId, capturedAction.ID, err) + } }() } } else if (result.Status == "" || result.Status == "WAITING") && mappedOutput.Status == "FINISHED" { diff --git a/shared.go b/shared.go index 99753710..053e78a1 100644 --- a/shared.go +++ b/shared.go @@ -17767,7 +17767,15 @@ func handleAgentDecisionStreamResult(workflowExecution WorkflowExecution, action // Handle agent decisionmaking. Use the same log.Printf("[INFO][%s] With the agent being finished, we are asking it whether it would like to do anything else", workflowExecution.ExecutionId) - returnAction, err := HandleAiAgentExecutionStart(workflowExecution, actionResult.Action, true) + var originalAction Action + if foundActionResultIndex >= 0 && foundActionResultIndex < len(workflowExecution.Results) { + originalAction = workflowExecution.Results[foundActionResultIndex].Action + } else { + // Fallback in case of an issue + originalAction = actionResult.Action + } + + returnAction, err := HandleAiAgentExecutionStart(workflowExecution, originalAction, true) if err != nil { log.Printf("[ERROR][%s] Failed handling agent execution start: %s", workflowExecution.ExecutionId, err) } @@ -22086,6 +22094,14 @@ func PrepareSingleAction(ctx context.Context, user User, appId string, body []by workflowExecution.OrgId = user.ActiveOrg.Id } + // formattedAppName := strings.ReplaceAll(strings.ToLower(app.Name), " ", "_") + + // isInternalShuffleApp := false + // switch formattedAppName { + // case "shuffle_datastore", "shuffle_org_management", "shuffle_app_management", "shuffle_workflow_management": + // isInternalShuffleApp = true + // } + if len(app.Name) == 0 && len(action.AppName) > 0 { app.Name = action.AppName }