diff --git a/ai.go b/ai.go index 4e6b09c9..a836f144 100644 --- a/ai.go +++ b/ai.go @@ -44,8 +44,8 @@ import ( var standalone bool // var model = "gpt-5-mini" -//var model = "gpt-5-mini" -var model = "gpt-5.4-nano" +var model = "gpt-5-mini" +//var model = "gpt-5.4-nano" //var model = "gpt-5.2-codex" var fallbackModel = "" @@ -1541,53 +1541,20 @@ func FixJSONNewlines(input string) string { } func FixContentOutput(contentOutput string) string { - if strings.Contains(contentOutput, "```json") { - // Handle ```json - start := strings.Index(contentOutput, "```json") - end := strings.Index(contentOutput, "```") - if start != -1 { - end = strings.Index(contentOutput[start+7:], "```") - - // Shift it so the index is at the correct place - end = end + start + 7 - } - - if start != -1 && end != -1 { - newend := end + 7 - newstart := start + 7 - - log.Printf("[INFO] Found ``` in content. Start: %d, end: %d", start, end) - - if newend > len(contentOutput) { - newend = end - } - - if newend > len(contentOutput) { - newend = len(contentOutput) - } - - if newstart > len(contentOutput) { - newstart = start - } - - if newstart > len(contentOutput) { - newstart = len(contentOutput) - } - - contentOutput = contentOutput[start+7 : newend] - } - } - - if strings.Contains(contentOutput, "```") { - start := strings.Index(contentOutput, "```") - end := strings.Index(contentOutput[start+3:], "```") - if start != -1 { - end = strings.Index(contentOutput[start+3:], "```") - end = end + start + 3 + // Safely extract content from ```json or ``` blocks + if start := strings.Index(contentOutput, "```json"); start != -1 { + start += 7 // skip ```json + if end := strings.Index(contentOutput[start:], "```"); end != -1 { + contentOutput = contentOutput[start : start+end] + } else { + contentOutput = contentOutput[start:] // Unmatched, take the rest } - - if start != -1 && end != -1 { - contentOutput = contentOutput[start+3 : end+3] + } else if start := strings.Index(contentOutput, "```"); start != -1 { + start += 3 // skip ``` + if end := strings.Index(contentOutput[start:], "```"); end != -1 { + contentOutput = contentOutput[start : start+end] + } else { + contentOutput = contentOutput[start:] // Unmatched, take the rest } } @@ -1706,6 +1673,166 @@ func balanceJSONLikeString(s string) string { return string(result) } +// normalizeRawDecisionFields converts any non-string 'Value' into a JSON-encoded string. +func normalizeRawDecisionFields(fields []rawField) { + for fieldIndex := range fields { + if fields[fieldIndex].Value == nil { + continue + } + + _, isAlreadyString := fields[fieldIndex].Value.(string) + if !isAlreadyString { + marshaledValueBytes, marshalErr := json.Marshal(fields[fieldIndex].Value) + if marshalErr == nil { + fields[fieldIndex].Value = string(marshaledValueBytes) + } + } + } +} + +// extractDecisionArray scans the text for the first '[' that successfully decodes into a valid array of decisions. +func extractDecisionArray(rawText string) ([]AgentDecision, error) { + for byteIndex := 0; byteIndex < len(rawText); byteIndex++ { + if rawText[byteIndex] != '[' { + continue + } + + var decodedRawDecisions []map[string]json.RawMessage + stringReader := strings.NewReader(rawText[byteIndex:]) + jsonDecoder := json.NewDecoder(stringReader) + decodeErr := jsonDecoder.Decode(&decodedRawDecisions) + + if decodeErr != nil || len(decodedRawDecisions) == 0 { + continue + } + + // Check if the first item has an "action" key + if _, hasAction := decodedRawDecisions[0]["action"]; !hasAction { + continue + } + + for mapIndex, rawMap := range decodedRawDecisions { + if rawFields, hasFields := rawMap["fields"]; hasFields { + var fields []rawField + if unmarshalErr := json.Unmarshal(rawFields, &fields); unmarshalErr == nil { + normalizeRawDecisionFields(fields) + + fixedFieldsBytes, marshalErr := json.Marshal(fields) + if marshalErr == nil { + decodedRawDecisions[mapIndex]["fields"] = fixedFieldsBytes + } + } + } + } + + marshaledJSONBytes, marshalErr := json.Marshal(decodedRawDecisions) + if marshalErr != nil { + continue + } + + var finalDecisions []AgentDecision + structUnmarshalErr := json.Unmarshal(marshaledJSONBytes, &finalDecisions) + if structUnmarshalErr != nil { + continue + } + + return finalDecisions, nil + } + + return nil, fmt.Errorf("no valid JSON array found") +} + +// extractDecisionJSONL scans the text for top-level '{' characters and extracts every valid JSON object. +func extractDecisionJSONL(rawText string) ([]AgentDecision, error) { + var collectedDecisions []AgentDecision + byteIndex := 0 + + for byteIndex < len(rawText) { + if rawText[byteIndex] != '{' { + byteIndex++ + continue + } + + stringReader := strings.NewReader(rawText[byteIndex:]) + jsonDecoder := json.NewDecoder(stringReader) + + var rawMap map[string]json.RawMessage + decodeErr := jsonDecoder.Decode(&rawMap) + + bytesConsumedByDecoder := int(jsonDecoder.InputOffset()) + if bytesConsumedByDecoder <= 0 { + byteIndex++ + } else { + byteIndex += bytesConsumedByDecoder + } + + if decodeErr != nil { + continue + } + + if _, hasAction := rawMap["action"]; !hasAction { + continue + } + + // Fix the "fields" array if it exists + if rawFields, hasFields := rawMap["fields"]; hasFields { + var fields []rawField + if unmarshalErr := json.Unmarshal(rawFields, &fields); unmarshalErr == nil { + normalizeRawDecisionFields(fields) + fixedFieldsBytes, marshalErr := json.Marshal(fields) + if marshalErr == nil { + rawMap["fields"] = fixedFieldsBytes + } + } + } + + marshaledJSONBytes, marshalErr := json.Marshal(rawMap) + if marshalErr != nil { + continue + } + + var finalDecision AgentDecision + structUnmarshalErr := json.Unmarshal(marshaledJSONBytes, &finalDecision) + if structUnmarshalErr != nil { + continue + } + + collectedDecisions = append(collectedDecisions, finalDecision) + } + + if len(collectedDecisions) == 0 { + return nil, fmt.Errorf("no valid JSONL objects found") + } + + return collectedDecisions, nil +} + +// parseAgentDecisions extracts AgentDecision structs from messy LLM output. It tries multiple strategies in order: JSON array, JSONL, then array after unescaping. +func parseAgentDecisions(rawOutput string) ([]AgentDecision, error) { + cleanedText := FixContentOutput(rawOutput) + + //Try to parse as a JSON array + parsedDecisions, extractionErr := extractDecisionArray(cleanedText) + if extractionErr == nil { + return parsedDecisions, nil + } + + // Try to parse as JSONL (one object per occurrence) + parsedDecisions, extractionErr = extractDecisionJSONL(cleanedText) + if extractionErr == nil { + return parsedDecisions, nil + } + + // unescape quotes, then retry the array strategy + unescapedText := strings.ReplaceAll(cleanedText, `\"`, `"`) + parsedDecisions, extractionErr = extractDecisionArray(unescapedText) + if extractionErr == nil { + return parsedDecisions, nil + } + + return nil, fmt.Errorf("failed to parse agent decisions from LLM output") +} + func AutofixAppLabels(ctx context.Context, app WorkflowApp, label string, keys []string) (WorkflowApp, WorkflowAppAction) { standalone := os.Getenv("STANDALONE") == "true" @@ -7821,6 +7948,7 @@ func HandleAiAgentExecutionStart(execution WorkflowExecution, startNode Action, _ = oldActionResult oldAgentOutput := AgentOutput{} previousAnswers := "" + continuationMessage := "" // Tracks user continuation text (new message sent to a finished agent) marshalledDecisions := []byte{} if createNextActions == true { @@ -8007,10 +8135,10 @@ func HandleAiAgentExecutionStart(execution WorkflowExecution, startNode Action, for _, field := range mappedDecision.Fields { if field.Key == "continue" && len(field.Answer) > 0 { if debug { - log.Printf("[DEBUG][%s] AI Agent continuation: overriding userMessage with 'continue' answer (length=%d)", execution.ExecutionId, len(field.Answer)) + log.Printf("[DEBUG][%s] AI Agent continuation: found 'continue' answer (length=%d); keeping original userMessage, adding as continuationMessage", execution.ExecutionId, len(field.Answer)) } - userMessage = field.Answer + continuationMessage = field.Answer foundContinuation = true break } @@ -8285,9 +8413,10 @@ You are an Action Execution Agent that performs actions in third-party tools. Yo ### INPUT PROTOCOL 1. **USER CONTEXT:** Available actions/tools. -2. **USER REQUEST:** Task to process. -3. **USER ANSWERS:** Explicit answers already provided by the user to prior agent questions. Treat these as authoritative context. -4. **HISTORY:** JSON list of previous executions (Newest First). +2. **ORIGINAL REQUEST (optional):** The user's prior request from this session, already completed. Visible in HISTORY. Use for context only — do NOT re-execute it. +3. **USER REQUEST:** The current task to complete. PHASE 1 checks THIS against HISTORY. +4. **USER ANSWERS:** Explicit answers already provided by the user to prior agent questions. Treat these as authoritative context. +5. **HISTORY:** JSON list of previous executions (Newest First). ### PHASE 1: COMPLETION CHECK (HIGHEST PRIORITY) **Compare the "USER REQUEST" against the "HISTORY".** @@ -8463,12 +8592,28 @@ data_filter: } } - // Fix e.g. injected JSON and other quote/newline mechanics that aren't compatible - // Problem: The input data itself can be a reference. - completionRequest.Messages = append(completionRequest.Messages, openai.ChatCompletionMessage{ - Role: openai.ChatMessageRoleUser, - Content: fmt.Sprintf("USER REQUEST: %s", userMessage), - }) + // Build the USER REQUEST message. + // For a normal run: USER REQUEST = the original user input. + // For a continuation (user sent a follow-up to a finished agent): the continuation is the live task that PHASE 1 should check against. The original question goes in as read-only context so the LLM knows the prior topic without re-executing it. + if len(continuationMessage) > 0 { + // Continuation run: new message is the actual task + if len(userMessage) > 0 { + completionRequest.Messages = append(completionRequest.Messages, openai.ChatCompletionMessage{ + Role: openai.ChatMessageRoleUser, + Content: fmt.Sprintf("ORIGINAL REQUEST (already completed, visible in HISTORY): %s", userMessage), + }) + } + completionRequest.Messages = append(completionRequest.Messages, openai.ChatCompletionMessage{ + Role: openai.ChatMessageRoleUser, + Content: fmt.Sprintf("USER REQUEST: %s", continuationMessage), + }) + } else { + // Normal run: original input is the task + completionRequest.Messages = append(completionRequest.Messages, openai.ChatCompletionMessage{ + Role: openai.ChatMessageRoleUser, + Content: fmt.Sprintf("USER REQUEST: %s", userMessage), + }) + } if len(previousAnswers) > 0 { completionRequest.Messages = append(completionRequest.Messages, openai.ChatCompletionMessage{ @@ -8477,8 +8622,8 @@ data_filter: }) } - if len(marshalledDecisions) > 4 { - completionRequest.Messages = append(completionRequest.Messages, openai.ChatCompletionMessage { + if len(marshalledDecisions) > 4 { + completionRequest.Messages = append(completionRequest.Messages, openai.ChatCompletionMessage{ Role: openai.ChatMessageRoleUser, Content: fmt.Sprintf("HISTORY:\n%s", string(marshalledDecisions)), }) @@ -8749,7 +8894,7 @@ data_filter: } // Maps OpenAI -> Result struct so we can handle it - resultMapping := ActionResult{} + resultMapping = ActionResult{} err = json.Unmarshal(body, &resultMapping) if err != nil { log.Printf("[ERROR] AI Agent (2): Failed unmarshalling response into decisions. Response from sending AI Agent request to %s: %d - '%s'. Err: %s", fullUrl, llmStatusCode, string(body), err) @@ -8872,7 +9017,7 @@ data_filter: } // Parse the outputMap.Result to OpenAI response - choicesString := "" + // choicesString = "" bodyMap, ok := outputMap.Body.(map[string]interface{}) if !ok { log.Printf("[ERROR][%s] AI Agent: Failed to convert body to MAP in AI Agent response. Raw response: %s", execution.ExecutionId, string(resultMapping.Result)) @@ -9000,49 +9145,26 @@ data_filter: // Found random JSON issues with [{} and similar, due to LLM instability. decisionString = FixContentOutput(choicesString) - // Find the first one and remove anything until that point conditionText := "conditions must be correct" - if !strings.HasPrefix(decisionString, `[`) { - firstIndex := strings.Index(decisionString, "[") - if firstIndex != -1 { - decisionString = decisionString[firstIndex:] - } else { - if !strings.Contains(decisionString, conditionText) { - log.Printf("[WARNING][%s] No '[' found in AI Agent response. Using full response: %s", execution.ExecutionId, decisionString) - } - } - } + errorMessage := "" - // LLM is occasionally appending freeform text like (e.g. "Summary: ...") after the closing bracket. Truncate everything past the last ']' so the JSON - // parser doesn't dont break due to that. - if lastBracket := strings.LastIndex(decisionString, "]"); lastBracket != -1 { - decisionString = decisionString[:lastBracket+1] - } + // Parse decisions using the refactored helper function + mappedDecisions, parsingErr := parseAgentDecisions(choicesString) - errorMessage := "" - mappedDecisions := []AgentDecision{} - err = json.Unmarshal([]byte(decisionString), &mappedDecisions) - if err != nil { + if len(mappedDecisions) == 0 { + if parsingErr == nil { + parsingErr = errors.New("no valid AgentDecision array or objects found in output") + } if !strings.Contains(decisionString, conditionText) { - log.Printf("[ERROR][%s] AI Agent (5): Failed unmarshalling decisions in AI Agent response: %s", execution.ExecutionId, err) + log.Printf("[ERROR][%s] AI Agent (6): Failed parsing decisions in AI Agent response: %s. String: %s", execution.ExecutionId, parsingErr, decisionString) } - - if len(mappedDecisions) == 0 { - decisionString = strings.Replace(decisionString, `\"`, `"`, -1) - - err = json.Unmarshal([]byte(decisionString), &mappedDecisions) - if err != nil && !strings.Contains(decisionString, conditionText) { - log.Printf("[ERROR][%s] AI Agent (6): Failed unmarshalling decisions in AI Agent response (2): %s. String: %s", execution.ExecutionId, err, decisionString) - - // Updating the OUTPUT in some way to help the user a bit. - if strings.Contains(decisionString, "conditions must be correct") { - errorMessage = fmt.Sprintf("Condition failed. See decision_string for details") - resultMapping.Status = "SKIPPED" - } else { - resultMapping.Status = "FAILURE" - errorMessage = fmt.Sprintf("The output from the LLM had no decisions. See the raw decisions tring for the response. Contact support@shuffler.io if you think this is wrong.") - } - } + // Updating the OUTPUT in some way to help the user a bit. + if strings.Contains(decisionString, "conditions must be correct") { + errorMessage = fmt.Sprintf("Condition failed. See decision_string for details") + resultMapping.Status = "SKIPPED" + } else { + resultMapping.Status = "FAILURE" + errorMessage = fmt.Sprintf("The output from the LLM had no decisions. See the raw decisions tring for the response. Contact support@shuffler.io if you think this is wrong.") } } @@ -9492,14 +9614,14 @@ data_filter: } if !foundResult { - // duration := int64(0) - // if agentOutput.StartedAt > 0 && agentOutput.CompletedAt > 0 { - // duration = agentOutput.CompletedAt - agentOutput.StartedAt - // } else if agentOutput.StartedAt > 0 { - // duration = time.Now().Unix() - agentOutput.StartedAt - // } + duration := int64(0) + if agentOutput.StartedAt > 0 && agentOutput.CompletedAt > 0 { + duration = (agentOutput.CompletedAt - agentOutput.StartedAt) / 1000 + } else if agentOutput.StartedAt > 0 { + duration = (time.Now().UnixMilli() - agentOutput.StartedAt) / 1000 + } - // log.Printf("[INFO] AI_AGENT_FINISH: execution_id=%s org=%s status=SUCCESS duration=%ds decisions=%d llm_calls=%d tokens_used=%d", execution.ExecutionId, execution.Workflow.OrgId, duration, len(agentOutput.Decisions), agentOutput.LLMCallCount, agentOutput.TotalTokens) + log.Printf("[INFO] AI_AGENT_FINISH: execution_id=%s org=%s status=FINISHED duration=%ds tool_calls=%d llm_calls=%d prompt_tokens=%d completion_tokens=%d total_tokens=%d", execution.ExecutionId, execution.Workflow.OrgId, duration, len(agentOutput.Decisions), agentOutput.LLMCallCount, agentOutput.PromptTokens, agentOutput.CompletionTokens, agentOutput.TotalTokens) } } diff --git a/blobs.go b/blobs.go index 77216fd0..6ad550a8 100644 --- a/blobs.go +++ b/blobs.go @@ -22,8 +22,8 @@ import ( func IsShuffleApp(app WorkflowApp) bool { parsedAppname := strings.ReplaceAll(strings.ToLower(app.Name), " ", "_") - skipAuthAppnames := []string{"openai", "shuffle_datastore", "shuffle_workflows", "shuffle_detection", "shuffle_sensors", "shuffle_monitors", "shuffle_host_monitors", "shuffles_app_management"} - skipAuthAppIds := []string{"5d19dd82517870c68d40cacad9b5ca91", "b82668d868f6dc7ac1dc14caa92c674b", "b598b078fd5c531699fca803c172ce72", "afda48b8d1f7dc7ac3caae87b2c072e9", "7f12d725c356677d28db042170444448", "48a954b9440b3913b8a2620e57b94a75", "605e31b19889e38f179fab112297eb42"} + skipAuthAppnames := []string{"openai", "shuffle_datastore", "shuffle_workflows", "shuffle_detection", "shuffle_sensors", "shuffle_monitors", "shuffle_host_monitors", "shuffle_apps"} + skipAuthAppIds := []string{"5d19dd82517870c68d40cacad9b5ca91", "b82668d868f6dc7ac1dc14caa92c674b", "b598b078fd5c531699fca803c172ce72", "afda48b8d1f7dc7ac3caae87b2c072e9", "7f12d725c356677d28db042170444448", "48a954b9440b3913b8a2620e57b94a75", "7db43ccd25261967b095cfbd467a75cc"} isShuffleApp := false if project.Environment == "cloud" && len(app.ID) > 0 { diff --git a/shared.go b/shared.go index 1ad2670e..ca5150e8 100644 --- a/shared.go +++ b/shared.go @@ -18157,15 +18157,27 @@ func handleAgentDecisionStreamResult(workflowExecution WorkflowExecution, action log.Printf("[INFO][%s] Agent continuation: resetting execution status from '%s' to 'EXECUTING' for continuation", workflowExecution.ExecutionId, workflowExecution.Status) workflowExecution.Status = "EXECUTING" workflowExecution.CompletedAt = 0 + } - executionCacheKey := fmt.Sprintf("workflowexecution_%s", workflowExecution.ExecutionId) - DeleteCache(ctx, executionCacheKey) - marshalledExec, marshalErr := json.Marshal(workflowExecution) - if marshalErr == nil { - SetCache(ctx, executionCacheKey, marshalledExec, 30) + if foundActionResultIndex >= 0 && foundActionResultIndex < len(workflowExecution.Results) { + if marshalledResult, marshalErr := json.Marshal(mappedResult); marshalErr == nil { + workflowExecution.Results[foundActionResultIndex].Result = string(marshalledResult) + + // push to the action result cache so GetWorkflowExecution inside HandleAiAgentExecutionStart picks up the fresh copy. + actionCacheId := fmt.Sprintf("%s_%s_result", workflowExecution.ExecutionId, actionResult.Action.ID) + go SetCache(ctx, actionCacheId, marshalledResult, 35) + } else { + log.Printf("[WARNING][%s] Failed to marshal updated mappedResult before HandleAiAgentExecutionStart: %s", workflowExecution.ExecutionId, marshalErr) } } + // so the GetWorkflowExecution fetch inside HandleAiAgentExecutionStart gets the fresh copy. + executionCacheKey := fmt.Sprintf("workflowexecution_%s", workflowExecution.ExecutionId) + DeleteCache(ctx, executionCacheKey) + if marshalledExec, marshalErr := json.Marshal(workflowExecution); marshalErr == nil { + SetCache(ctx, executionCacheKey, marshalledExec, 30) + } + callerName := "handleAgentDecisionStreamResult" returnAction, err := HandleAiAgentExecutionStart(workflowExecution, originalAction, true, callerName) if err != nil { @@ -26328,6 +26340,11 @@ func PrepareWorkflowExecution(ctx context.Context, workflow Workflow, request *h // Handles agentic run continues // This is if shuffler.io/agents => questions are answered + // Only process decision injection for the specific action result we are targeting + if result.Action.ID != start[0] { + continue + } + if agentic { log.Printf("[INFO][%s] Should fix the decision by injecting the values and continuing to the next step! :3", oldExecution.ExecutionId) @@ -37769,13 +37786,32 @@ func GetWorkflowMinimal(resp http.ResponseWriter, request *http.Request) { return } - // Fetch workflow - workflow, err := GetWorkflow(ctx, workflowId) - if err != nil { - log.Printf("[WARNING] Failed getting workflow %s: %s", workflowId, err) - resp.WriteHeader(404) - resp.Write([]byte(`{"success": false, "reason": "Workflow not found"}`)) - return + // Fetch workflow (try cache first so agent sees its draft) + cacheKey := fmt.Sprintf("workflow_ops_cache_%s", workflowId) + cachedWorkflow, cacheErr := GetCache(ctx, cacheKey) + + var workflow *Workflow + if cacheErr == nil && cachedWorkflow != nil { + if byteData, ok := cachedWorkflow.([]byte); ok { + workflow = &Workflow{} + err := json.Unmarshal(byteData, workflow) + if err != nil { + log.Printf("[WARNING] Failed unmarshaling cached workflow in GetWorkflowMinimal: %s", err) + workflow = nil + } + } + } + + // Fallback to DB if no cache + if workflow == nil { + var err error + workflow, err = GetWorkflow(ctx, workflowId) + if err != nil { + log.Printf("[WARNING] Failed getting workflow %s: %s", workflowId, err) + resp.WriteHeader(404) + resp.Write([]byte(`{"success": false, "reason": "Workflow not found"}`)) + return + } } // Permission check: user owns it OR user is in same org @@ -38239,84 +38275,153 @@ func enrichTriggerFromApp(minTrig *MinimalTrigger, environment string) (Trigger, } } -func broadcastToStream(workflowID string, operation WorkflowOperation, userID string, username string, authHeader string) { - // Convert SetOps operation to StreamOps format - item := "node" // default - switch operation.Op { - case "add_branch", "edit_branch", "delete_branch": - item = "branch" - case "add_condition", "edit_condition", "delete_condition": - item = "condition" - } - - if len(userID) == 0 { - userID = "agent" - } - if len(username) == 0 { - username = "agent" - } - - streamOp := StreamWorkflowOperation{ - Item: item, - Type: operation.Op, - ID: operation.ID, - UserID: userID, - Username: username, - Data: operation.Data, - Timestamp: time.Now().UnixMilli(), - } - - // Marshal to JSON - payload, err := json.Marshal(streamOp) - if err != nil { - log.Printf("[WARNING] Failed to marshal stream operation for workflow %s: %s", workflowID, err) - return - } - - baseURL := os.Getenv("BASE_URL") - if len(baseURL) == 0 { - if len(os.Getenv("SHUFFLE_CLOUDRUN_URL")) > 0 { - baseURL = os.Getenv("SHUFFLE_CLOUDRUN_URL") - } else { - port := os.Getenv("PORT") - if len(port) == 0 { - port = "5001" - } - baseURL = fmt.Sprintf("http://localhost:%s", port) - } - } - - streamURL := fmt.Sprintf("%s/api/v1/workflows/%s/stream", baseURL, workflowID) - - // Create HTTP POST request - req, err := http.NewRequest("POST", streamURL, strings.NewReader(string(payload))) - if err != nil { - log.Printf("[WARNING] Failed to create stream request for workflow %s: %s", workflowID, err) - return - } - - // Set headers - req.Header.Set("Content-Type", "application/json") - if len(authHeader) > 0 { - req.Header.Set("Authorization", authHeader) - } - - // Make request with timeout - client := &http.Client{Timeout: 10 * time.Second} - resp, err := client.Do(req) - if err != nil { - log.Printf("[WARNING] Failed to broadcast to stream for workflow %s: %s", workflowID, err) - return - } - defer resp.Body.Close() - - // Log result - if resp.StatusCode >= 200 && resp.StatusCode < 300 { - log.Printf("[DEBUG] Streamed operation %s to workflow %s", operation.Op, workflowID) - } else { - log.Printf("[WARNING] Stream endpoint returned status %d for workflow %s", resp.StatusCode, workflowID) - } -} +// func broadcastBatchToStream(wf *Workflow, operations []WorkflowOperation, tempIDMap map[string]string, userID string, username string, authHeader string) { +// if len(operations) == 0 { +// return +// } + +// if len(userID) == 0 { +// userID = "agent" +// } +// if len(username) == 0 { +// username = "agent" +// } + +// var streamOps []StreamWorkflowOperation + +// for _, operation := range operations { +// item := "node" // default +// opType := "" +// switch operation.Op { +// case "add_node": +// item = "node" +// opType = "add" +// case "move_node": +// item = "node" +// opType = "move" +// case "edit_node": +// item = "node" +// opType = "configure" +// case "delete_node": +// item = "node" +// opType = "remove" +// case "add_branch": +// item = "branch" +// opType = "add" +// case "edit_branch": +// item = "branch" +// opType = "configure" +// case "delete_branch": +// item = "branch" +// opType = "remove" +// case "save_workflow": +// item = "workflow" +// opType = "save" +// case "set_start_node": +// item = "workflow" +// opType = "configure" +// default: +// item = "node" +// opType = operation.Op +// } + +// opID := operation.ID +// if realID, exists := tempIDMap[operation.ID]; exists { +// opID = realID +// } else if realID, exists := tempIDMap[operation.TempID]; exists { +// opID = realID +// } + +// // Extract the ENRICHED node/branch from the workflow instead of using the Minimal payload +// // We only need the fully enriched data for CREATING nodes/branches. +// // For edits or moves, we just pass the partial payload the agent sent so the UI can patch it locally. +// var enrichedData interface{} +// if operation.Op == "add_node" { +// if operation.NodeType == "action" { +// if idx := findActionIndexByID(wf, opID); idx != -1 { +// enrichedData = wf.Actions[idx] +// } +// } else if operation.NodeType == "trigger" { +// if idx := findTriggerIndexByID(wf, opID); idx != -1 { +// enrichedData = wf.Triggers[idx] +// } +// } +// } else if operation.Op == "add_branch" { +// if idx := findBranchIndexByID(wf, opID); idx != -1 { +// enrichedData = wf.Branches[idx] +// } +// } + +// var finalData []byte +// if enrichedData != nil { +// finalData, _ = json.Marshal(enrichedData) +// } else { +// // Fallback for delete ops where the node is already removed from wf, or if not found +// finalData = operation.Data +// } + +// streamOps = append(streamOps, StreamWorkflowOperation{ +// Item: item, +// Type: opType, +// ID: opID, +// UserID: userID, +// Username: username, +// Data: finalData, +// Timestamp: time.Now().UnixMilli(), +// }) +// } + +// // Marshal to JSON array +// payload, err := json.Marshal(streamOps) +// if err != nil { +// log.Printf("[WARNING] Failed to marshal stream operations for workflow %s: %s", wf.ID, err) +// return +// } + +// baseURL := os.Getenv("BASE_URL") +// if len(baseURL) == 0 { +// if len(os.Getenv("SHUFFLE_CLOUDRUN_URL")) > 0 { +// baseURL = os.Getenv("SHUFFLE_CLOUDRUN_URL") +// } else { +// port := os.Getenv("PORT") +// if len(port) == 0 { +// port = "5001" +// } +// baseURL = fmt.Sprintf("http://localhost:%s", port) +// } +// } + +// streamURL := fmt.Sprintf("%s/api/v1/workflows/%s/stream", baseURL, wf.ID) + +// // Create HTTP POST request +// req, err := http.NewRequest("POST", streamURL, strings.NewReader(string(payload))) +// if err != nil { +// log.Printf("[WARNING] Failed to create stream request for workflow %s: %s", wf.ID, err) +// return +// } + +// // Set headers +// req.Header.Set("Content-Type", "application/json") +// if len(authHeader) > 0 { +// req.Header.Set("Authorization", authHeader) +// } + +// // Make request with timeout +// client := &http.Client{Timeout: 10 * time.Second} +// resp, err := client.Do(req) +// if err != nil { +// log.Printf("[WARNING] Failed to broadcast to stream for workflow %s: %s", wf.ID, err) +// return +// } +// defer resp.Body.Close() + +// // Log result +// if resp.StatusCode >= 200 && resp.StatusCode < 300 { +// log.Printf("[DEBUG] Streamed %d operations to workflow %s", len(streamOps), wf.ID) +// } else { +// log.Printf("[WARNING] Stream endpoint returned status %d for workflow %s", resp.StatusCode, wf.ID) +// } +// } func HandleAgentWorkflowSave(resp http.ResponseWriter, request *http.Request) { cors := HandleCors(resp, request) @@ -38342,22 +38447,15 @@ func HandleAgentWorkflowSave(resp http.ResponseWriter, request *http.Request) { // Extract workflow ID from URL location := strings.Split(request.URL.String(), "/") - var workflowID string + var urlWorkflowID string if len(location) > 4 && location[1] == "api" { - workflowID = location[4] - if strings.Contains(workflowID, "?") { - workflowID = strings.Split(workflowID, "?")[0] + urlWorkflowID = location[4] + if strings.Contains(urlWorkflowID, "?") { + urlWorkflowID = strings.Split(urlWorkflowID, "?")[0] } } - if len(workflowID) != 36 { - log.Printf("[WARNING] Invalid workflow ID: %s", workflowID) - resp.WriteHeader(400) - resp.Write([]byte(`{"success": false, "reason": "Invalid workflow ID"}`)) - return - } - - // Parse request + // Parse request first so we can fallback to body's WorkflowID body, err := ioutil.ReadAll(request.Body) if err != nil { log.Printf("[WARNING] Failed reading request body: %s", err) @@ -38376,8 +38474,21 @@ func HandleAgentWorkflowSave(resp http.ResponseWriter, request *http.Request) { return } + workflowID := urlWorkflowID + if len(workflowID) != 36 { + // Fallback to body's WorkflowID if URL ID is invalid (e.g., %7Bkey%7D from MCP) + if len(setOpsReq.WorkflowID) == 36 { + workflowID = setOpsReq.WorkflowID + } else { + log.Printf("[WARNING] Invalid workflow ID: %s", urlWorkflowID) + resp.WriteHeader(400) + resp.Write([]byte(`{"success": false, "reason": "Invalid workflow ID"}`)) + return + } + } + // Validate request - if setOpsReq.WorkflowID != workflowID { + if setOpsReq.WorkflowID != "" && setOpsReq.WorkflowID != workflowID { resp.WriteHeader(400) resp.Write([]byte(`{"success": false, "reason": "Workflow ID mismatch"}`)) return @@ -38390,7 +38501,7 @@ func HandleAgentWorkflowSave(resp http.ResponseWriter, request *http.Request) { } // Get workflow (from cache or DB) - cacheKey := fmt.Sprintf("workflow_ops_cache_%s_%s", workflowID, user.Id) + cacheKey := fmt.Sprintf("workflow_ops_cache_%s", workflowID) cachedWorkflow, cacheErr := GetCache(ctx, cacheKey) var workflow *Workflow @@ -38427,7 +38538,14 @@ func HandleAgentWorkflowSave(resp http.ResponseWriter, request *http.Request) { // Apply operations (all-or-nothing) with temp ID mapping tempIDMap := make(map[string]string) // Maps temp_id → real_id + shouldSaveDB := false + for opIndex, operation := range setOpsReq.Operations { + if operation.Op == "save_workflow" { + shouldSaveDB = true + continue + } + err = applyWorkflowOperationWithMapping(ctx, user, workflow, &operation, tempIDMap) if err != nil { errMsg := fmt.Sprintf(`{"success": false, "reason": "Operation %d failed: %s", "failed_at_op": %d}`, opIndex, err.Error(), opIndex) @@ -38453,6 +38571,61 @@ func HandleAgentWorkflowSave(resp http.ResponseWriter, request *http.Request) { // Don't fail the request, cache is best-effort } + if shouldSaveDB { + env := workflow.ExecutionEnvironment + if len(env) == 0 { + env = "Shuffle" + } + for i := range workflow.Actions { + if len(workflow.Actions[i].Environment) == 0 { + workflow.Actions[i].Environment = env + } + } + for i := range workflow.Triggers { + if len(workflow.Triggers[i].Environment) == 0 { + workflow.Triggers[i].Environment = env + } + } + + allAuths, authErr := GetAllWorkflowAppAuth(ctx, user.ActiveOrg.Id) + if authErr != nil { + log.Printf("[WARNING] Could not load org auths for auto-hydration during save: %s", authErr) + } else { + for i := range workflow.Actions { + if workflow.Actions[i].AuthenticationId != "" { + continue // already has auth, skip + } + appID := workflow.Actions[i].AppID + appName := workflow.Actions[i].AppName + + var bestAuth *AppAuthenticationStorage + var bestEdited int64 = -1 + for j := range allAuths { + a := &allAuths[j] + if a.App.ID != appID && !strings.EqualFold(a.App.Name, appName) { + continue + } + if a.Edited > bestEdited { + bestEdited = a.Edited + bestAuth = a + } + } + if bestAuth != nil { + workflow.Actions[i].AuthenticationId = bestAuth.Id + //log.Printf("[INFO] Auto-assigned auth %s (%s) to action %s (%s)", bestAuth.Label, bestAuth.Id, workflow.Actions[i].Label, appName) + } + } + } + + err = SetWorkflow(ctx, *workflow, workflow.ID) + if err != nil { + log.Printf("[ERROR] Failed saving workflow to DB: %s", err) + resp.WriteHeader(500) + resp.Write([]byte(`{"success": false, "reason": "Failed to save workflow to database"}`)) + return + } + } + // Build response minWf := buildMinimalWorkflow(workflow) response := WorkflowSetOpsResponse{ @@ -38472,10 +38645,8 @@ func HandleAgentWorkflowSave(resp http.ResponseWriter, request *http.Request) { // Broadcast operations to stream endpoint (agent gets response immediately, streaming happens in background) // Extract auth header from incoming request to pass to stream endpoint - authHeader := request.Header.Get("Authorization") - for _, operation := range setOpsReq.Operations { - go broadcastToStream(workflowID, operation, user.Id, user.Username, authHeader) - } + // authHeader := request.Header.Get("Authorization") + // go broadcastBatchToStream(workflow, setOpsReq.Operations, tempIDMap, "agent", "Agent", authHeader) if debug{ log.Printf("[INFO] Applied %d operations to workflow %s for user %s", len(setOpsReq.Operations), workflowID, user.Username) @@ -38511,6 +38682,15 @@ func applyWorkflowOperationWithMapping(ctx context.Context, user User, wf *Workf case "delete_condition": return opDeleteCondition(wf, op) + // ====== WORKFLOW OPERATIONS ====== + case "set_start_node": + if realID, exists := tempIDMap[op.ID]; exists { + wf.Start = realID + } else { + wf.Start = op.ID + } + return nil + default: return fmt.Errorf("unknown operation: %s", op.Op) } @@ -38581,7 +38761,15 @@ func opAddNode(ctx context.Context, user User, wf *Workflow, op *WorkflowOperati // } // } - // Should we let the agent specify the position? If not, can we auto-calculate based on existing nodes ?? + // If the agent didn't specify a position (0,0), auto-layout to prevent stacking + if minAct.X == 0 && minAct.Y == 0 { + startX := -312.0 + y := 190.0 + xSpacing := 437.0 + minAct.X = int64(startX + float64(len(wf.Triggers)+len(wf.Actions))*xSpacing) + minAct.Y = int64(y) + } + newAction.Position = Position{ X: float64(minAct.X), Y: float64(minAct.Y), @@ -38798,7 +38986,7 @@ func opMoveNode(wf *Workflow, op *WorkflowOperation) error { return nil } - return fmt.Errorf("node %s not found in workflow (not an action or trigger)", op.ID) + return fmt.Errorf("node %s not found in workflow (not an action or trigger) for move", op.ID) } func opDeleteNode(wf *Workflow, op *WorkflowOperation) error { @@ -38806,7 +38994,12 @@ func opDeleteNode(wf *Workflow, op *WorkflowOperation) error { case "action": idx := findActionIndexByID(wf, op.ID) if idx == -1 { - return fmt.Errorf("action %s not found", op.ID) + // Already gone, idempotent no-op (e.g. cascade from a prior delete) + if debug { + log.Printf("[DEBUG] delete_node(action): action %s not found, already removed - skipping", op.ID) + } + + return nil } // Remove action @@ -38824,7 +39017,11 @@ func opDeleteNode(wf *Workflow, op *WorkflowOperation) error { case "trigger": idx := findTriggerIndexByID(wf, op.ID) if idx == -1 { - return fmt.Errorf("trigger %s not found", op.ID) + // Already gone idempotent no-op + if debug { + log.Printf("[DEBUG] delete_node(trigger): trigger %s not found, already removed - skipping", op.ID) + } + return nil } wf.Triggers = append(wf.Triggers[:idx], wf.Triggers[idx+1:]...) @@ -38869,7 +39066,23 @@ func opAddBranchWithMapping(wf *Workflow, op *WorkflowOperation, tempIDMap map[s resolvedData, _ := json.Marshal(branchData) op.Data = resolvedData - return opAddBranch(wf, op) + err := opAddBranch(wf, op) + if err != nil { + return err + } + + // Map the newly generated branch ID back to the agent's temp IDs + if len(wf.Branches) > 0 { + realID := wf.Branches[len(wf.Branches)-1].ID + if len(op.TempID) > 0 { + tempIDMap[op.TempID] = realID + } + if len(op.ID) > 0 { + tempIDMap[op.ID] = realID + } + } + + return nil } func opAddBranch(wf *Workflow, op *WorkflowOperation) error { @@ -38939,7 +39152,13 @@ func opDeleteBranch(wf *Workflow, op *WorkflowOperation) error { return nil } } - return fmt.Errorf("branch %s not found", op.ID) + // Branch not found - this is OK if it was already removed as a consequence of + // a previous delete_node op. Treat as a no-op so the agent doesn't retry. + if debug { + log.Printf("[DEBUG] delete_branch: branch %s not found, already removed (likely cascade from delete_node) - skipping", op.ID) + } + + return nil }