Pipeline Steps (what each step does)
Step responsibilities
| Step | Primary responsibility | Main session mutations | Config/table dependencies |
|---|---|---|---|
| LoadOrCreateConversationStep | Fetch or bootstrap conversation row | conversation, intent/state/context sync | ce_conversation |
| CacheInspectAuditStep | Conditionally audit cache snapshot | none | convengine.audit.cache-inspector property |
| ResetConversationStep | Early explicit reset | intent/state/context/input params reset | input flags, command text |
| PersistConversationBootstrapStep | Ensure conversation row persisted | none/metadata | ce_conversation |
| AuditUserInputStep | Persist user input audit | none | ce_audit |
| PolicyEnforcementStep | Policy block and stop | payload + stop result on block | ce_policy |
| DialogueActStep | Classify user turn action type | dialogue_act, dialogue_act_confidence, standalone_query, resolved_user_input | ce_config (dialogue act mode), ce_audit |
| InteractionPolicyStep | Decide runtime policy before intent | policy_decision, skip_intent_resolution | ce_config, session pending state |
| CorrectionStep | Route confirmation turns and apply in-place correction patches | routing_decision, skip_schema_extraction, correction_applied | ce_output_schema, session context, ce_audit |
| ActionLifecycleStep | Maintain pending action runtime TTL/status | pending_action_runtime context | ce_pending_action, ce_audit |
| DisambiguationStep | Ask question when multiple actions fit | pending_clarification question/context | ce_pending_action, ce_config, ce_audit |
| GuardrailStep | Apply guardrails and approval rules | guardrail flags/sanitized text | ce_config, ce_audit |
| IntentResolutionStep | Resolve intent with classifier+agent | intent/state/clarification fields | ce_intent, ce_intent_classifier, ce_config |
| ResetResolvedIntentStep | Reset on configured reset intent | full reset | ce_config RESET_INTENT_CODES |
| FallbackIntentStateStep | Fill missing intent/state defaults | intent/state | none |
| AddContainerDataStep | Fetch and attach container data | containerData/context merge | ce_container_config |
| PendingActionStep | Execute/reject pending action task | pending_action_runtime status/result | ce_pending_action, CeTaskExecutor, ce_audit |
| ToolOrchestrationStep | Run tool_group based orchestration | tool_request/tool_result fields | ce_tool, ce_mcp_tool, ce_audit |
| McpToolStep | MCP planner/tool loop | context_json.mcp.*, mcp tool metadata | ce_mcp_tool, ce_mcp_db_tool, ce_mcp_planner (fallback ce_config) |
| SchemaExtractionStep | Schema-driven extraction and lock handling | schema facts/context/lock, POST_SCHEMA_EXTRACTION facts | ce_output_schema, ce_prompt_template |
| AutoAdvanceStep | Compute schema status facts | schemaComplete/hasAny | resolved schema + context |
| RulesStep | Match and apply transitions/actions | intent/state/input params | ce_rule |
| StateGraphStep | Validate state transition path | state_graph_valid/reason | ce_state_graph, ce_audit |
| ResponseResolutionStep | Resolve and generate output payload | payload/last assistant json | ce_response, ce_prompt_template |
| MemoryStep | Write memory/session summary | memory.session_summary in context | ce_memory, ce_audit |
| PersistConversationStep | Persist final conversation and result | finalResult | ce_conversation |
| PipelineEndGuardStep | Timing audit + terminal guard | timings | ce_audit |
- Before schema extraction: normalize inputs
- After rules: inspect intent/state transition correctness
- Before response resolution: inject display hints
Rules execute by phase. Available native phases include PRE_RESPONSE_RESOLUTION, POST_AGENT_INTENT, POST_SCHEMA_EXTRACTION, PRE_AGENT_MCP, POST_AGENT_MCP, and POST_TOOL_EXECUTION.
Prefer transitions in ce_rule unless absolutely framework-level behavior is required. This keeps domain behavior data-driven and testable via audit traces.
ce_prompt_template.interaction_mode and ce_prompt_template.interaction_contract are the preferred turn-semantics contract in v2.0.9+.
SchemaExtractionStepuses the scopedSCHEMA_JSONtemplate for extraction prompts;COLLECTplusexpects:["structured_input"]is the recommended shape.CorrectionStepshould rely on the active prompt template semantics (CONFIRM,PROCESSING, andinteraction_contractcapabilities such asaffirm,edit,retry) instead of parsing state-name substrings.ResponseResolutionStepstill selects byintent_code + state_code + response_type, butinteraction_modehelps document what kind of user interaction that template represents.
Responsibility: Fetch or bootstrap conversation row
Session Mutations: conversation, intent/state/context sync
Config/Table Dependencies: ce_conversation
Detailed Execution Logic
This is the initial bootstrap step of the runtime engine. It uses the conversationId provided in the HTTP request to lookup an existing CeConversation row in the Postgres database.
If the conversation exists:
- The context JSON is hydrated into the runtime
EngineSession. - Previous
intentandstatecodes are restored. - All stored memory summaries and pending actions are fetched from the database and loaded into memory.
If the conversation is new:
- A new
CeConversationentity is instantiated. - The intent and state default to
UNKNOWN.
This ensures that regardless of scale, the API is entirely stateless and can route requests to any pod.
public StepResult execute(EngineSession session) {
UUID id = session.getConversationId();
CeConversation convo = conversationRepo.findById(id).orElseGet(() -> createNewConversation(id, conversationRepo));
convo.setLastUserText(session.getUserText());
convo.setUpdatedAt(OffsetDateTime.now());
session.setConversation(convo);
session.syncFromConversation();
return new StepResult.Continue();
}
When the LLM path is used, DialogueActStep now audits:
DIALOGUE_ACT_LLM_INPUTDIALOGUE_ACT_LLM_OUTPUTDIALOGUE_ACT_LLM_ERROR
It still emits the final classification checkpoint as DIALOGUE_ACT_CLASSIFIED.
Responsibility: Conditionally audit cache snapshot
Session Mutations: none
Config/Table Dependencies: convengine.audit.cache-inspector property
Detailed Execution Logic
Dynamically runs if convengine.audit.cache-inspector is true. It takes a complete JSON snapshot of the hydrated EngineSession conversation cache tree and logs it under the CACHE_INSPECTION priority stage before explicit user input audits are finalized.
public StepResult execute(EngineSession session) {
Map<String, Object> payload = new LinkedHashMap<>();
payload.put("cache_snapshot", objectMapper.valueToTree(session.getConversation()));
auditService.audit(ConvEngineAuditStage.CACHE_INSPECTION, session.getConversationId(), payload);
return new StepResult.Continue();
}
Responsibility: Early explicit reset
Session Mutations: intent/state/context/input params reset
Config/Table Dependencies: input flags, command text
Detailed Execution Logic
Checks EngineSession properties to see if an explicit reset has been triggered by the invoking consumer (this is usually passed as a param like _reset=true).
When triggered, it clears:
- The
intentandstatetrackers. - The
contextJson(wiping all extracted schema facts). - The
inputParamsJson.
The session is marked as `RUNNING` again, but completely fresh. An audit event CONVERSATION_RESET is logged.
public StepResult execute(EngineSession session) {
if (!shouldReset(session)) {
return new StepResult.Continue();
}
String reason = resetReason(session);
session.resetForConversationRestart();
session.getConversation().setStatus("RUNNING");
session.getConversation().setIntentCode("UNKNOWN");
session.getConversation().setStateCode("UNKNOWN");
session.getConversation().setContextJson("{}");
session.getConversation().setInputParamsJson("{}");
session.getConversation().setLastAssistantJson(null);
session.getConversation().setUpdatedAt(OffsetDateTime.now());
conversationRepository.save(session.getConversation());
Map<String, Object> payload = new LinkedHashMap<>();
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.REASON, reason);
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.INTENT, session.getIntent());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.STATE, session.getState());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.CONTEXT, session.getContextJson());
audit.audit(ConvEngineAuditStage.CONVERSATION_RESET, session.getConversationId(), payload);
return new StepResult.Continue();
}
Responsibility: Ensure conversation row persisted
Session Mutations: none/metadata
Config/Table Dependencies: ce_conversation
Detailed Execution Logic
A simple lifecycle checkpoint to ensure the conversation has a createdAt timestamp. If the user session just started in LoadOrCreateConversationStep, this step performs the initial INSERT (ce_conversation) to the database to ensure foreign-key dependencies (like audit logs) don't fail later in the loop.
public StepResult execute(EngineSession session) {
if (session.getConversation().getCreatedAt() == null) {
session.getConversation().setCreatedAt(OffsetDateTime.now());
session.getConversation().setUpdatedAt(OffsetDateTime.now());
conversationRepo.save(session.getConversation());
}
return new StepResult.Continue();
}
Responsibility: Persist user input audit
Session Mutations: none
Config/Table Dependencies: ce_audit
Detailed Execution Logic
Records the raw text query the user typed on this turn into the ce_audit table. This is purely for debug tracing and business analytics. It binds the USER_INPUT audit stage with the conversation ID and the text payload.
public StepResult execute(EngineSession session) {
Map<String, Object> payload = new LinkedHashMap<>();
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.TEXT, session.getUserText());
audit.audit(ConvEngineAuditStage.USER_INPUT, session.getConversationId(), payload);
return new StepResult.Continue();
}
Responsibility: Policy block and stop
Session Mutations: payload + stop result on block
Config/Table Dependencies: ce_policy
Detailed Execution Logic
Secures the pipeline against prohibited input using ce_policy.
It reads all active rows from ce_policy, executing either REGEX, EXACT, or LLM rules against the user's raw text. If a match occurs:
- The conversation is forced to a `BLOCKED` status.
- A
StepResult.Stop()is returned immediately, skipping all remaining NLP and intent steps. - The
ce_policy.response_textis loaded as the final payload shipped back to the consumer.
public StepResult execute(EngineSession session) {
String userText = session.getUserText();
for (CePolicy policy : policyRepo.findByEnabledTrueOrderByPriorityAsc()) {
if (matches(policy.getRuleType(), policy.getPattern(), userText)) {
Map<String, Object> payload = new LinkedHashMap<>();
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.POLICY_ID, policy.getPolicyId());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.RULE_TYPE, policy.getRuleType());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.PATTERN, policy.getPattern());
audit.audit(ConvEngineAuditStage.POLICY_BLOCK, session.getConversationId(), payload);
session.getConversation().setStatus("BLOCKED");
session.getConversation().setLastAssistantJson(jsonText(policy.getResponseText()));
session.getConversation().setUpdatedAt(OffsetDateTime.now());
conversationRepo.save(session.getConversation());
EngineResult out = new EngineResult(
session.getIntent(),
session.getState(),
new TextPayload(policy.getResponseText()),
session.getContextJson()
);
return new StepResult.Stop(out);
}
}
return new StepResult.Continue();
}
Responsibility: Classify user turn action type
Session Mutations: dialogue_act in input params
Config/Table Dependencies: ce_config (dialogue act mode), ce_audit
Detailed Execution Logic
This step attempts to classify the raw user text into an explicit conversational "act" through Regex, and optionally as fallback, a LLM request depending on the strictness of convengine.flow.dialogue-act.resolute (e.g., REGEX_THEN_LLM).
Why this step exists:
- it turns raw free-form phrases like
yes,go ahead,change amount to 350000, orstart overinto stable engine signals - downstream steps should not branch on raw text because raw phrasing is inconsistent and expensive to reason about repeatedly
- it reduces unnecessary intent/schema/LLM work for short operational turns
- it makes
ce_ruleconditions deterministic by giving rules normalized fields instead of unstructured user text
If convengine.flow.queryRewrite.enabled=true and the conversionHistory is present, the step morphs its LLM request into a dual Classifier & RAG Context Optimizer. It supplies the ongoing conversation to the LLM to rewrite ambiguous pronouns into an explicit standalone search query, returning it as "standaloneQuery".
The core parameters for these prompts are dynamically loaded from ce_config, allowing administrators to Hot-Swap classifier behaviors in production without recompiling Java arrays.
Supported Dialogue Acts
| Enum Name | Purpose |
|---|---|
AFFIRM | The user expressed agreement or confirmation (e.g., "yes", "go ahead"). Can override Guardrails or execute Pending Actions. |
NEGATE | The user expressed rejection or cancellation (e.g., "no", "stop"). Can cancel active Interaction Policies. |
EDIT | The user wants to change previously supplied context or entity slots. |
RESET | The user wants to clear memory and start completely fresh. |
QUESTION | The user is asking a direct conversational question (triggers fallback intent workflows). |
NEW_REQUEST | The baseline generic classification. Engine routes normally. |
GREETING | The user issued a pleasantry ("Hi", "Hello"). Typically bypassed by downstream Orchestrators to prevent expensive RAG queries. |
Source Execution Profile
The runtime still keeps a conservative regex guard for destructive resets. RESET can be forced back to a REGEX_GUARD result when the user text does not clearly match the reset regex. EDIT is no longer globally downgraded in Java. Instead, the engine preserves regex and LLM candidate values, and a POST_DIALOGUE_ACT rule pass can apply a DB-driven override using SET_DIALOGUE_ACT when the workflow should trust the LLM candidate.
Why POST_DIALOGUE_ACT exists
The engine should stay conservative by default, but business workflows still need a safe override point.
Examples:
- user says:
Ohh wait, I missed one zero. Change amount to 350000. - regex may still classify this as
NEW_REQUEST - the LLM candidate may classify it as
EDIT - a
POST_DIALOGUE_ACTrule can inspect:inputParams.dialogue_actinputParams.dialogue_act_sourceinputParams.dialogue_act_llm_candidateinputParams.dialogue_act_llm_standalone_query
- then use
SET_DIALOGUE_ACTto restoreEDITbeforeInteractionPolicyStep
This keeps:
- the default engine behavior safe
- the override behavior DB-driven
- the state model dynamic instead of hardcoded in Java
@MustRunAfter(AuditUserInputStep.class)
@MustRunBefore(IntentResolutionStep.class)
public class DialogueActStep implements EngineStep {
private final ConvEngineFlowConfig flowConfig;
private final CeConfigResolver configResolver;
// Extracted from ce_config during Application Initialization
private Pattern REGEX_GREETING;
private Pattern REGEX_AFFIRM;
private Pattern REGEX_NEGATE;
private String SYSTEM_PROMPT;
private String QUERY_REWRITE_SYSTEM_PROMPT;
// ...
@PostConstruct
public void init() {
// Core Regex Evaluation Patterns mapped securely in ce_config
REGEX_GREETING = Pattern.compile(
configResolver.resolveString(this, "REGEX_GREETING", "^(\s)*(hi|hello|hey|greetings|good morning|...)$"),
Pattern.CASE_INSENSITIVE
);
// Loads from ce_config where config_type="DialogueActStep" and config_key="SYSTEM_PROMPT".
SYSTEM_PROMPT = configResolver.resolveString(this, "SYSTEM_PROMPT", """
You are a dialogue-act classifier.
Return JSON only with:
{"dialogueAct":"AFFIRM|NEGATE|EDIT|RESET|QUESTION|NEW_REQUEST|GREETING","confidence":0.0}
""");
// Loads the query-rewrite variant
QUERY_REWRITE_SYSTEM_PROMPT = configResolver.resolveString(this, "QUERY_REWRITE_SYSTEM_PROMPT", """
You are a dialogue-act classifier and intelligent query search rewriter.
Using the conversation history, rewrite the user's text into an explicit, standalone query that perfectly describes their intent without needing the conversation history context.
Also classify their dialogue act.
Return JSON only matching the exact schema.
""");
}
@Override
public StepResult execute(EngineSession session) {
String userText = session.getUserText();
// 1. Regex Classification First
DialogueActResult regexResult = classifyByRegex(userText);
// 2. Resolve final intention while preserving regex + LLM candidates
DialogueActResolution resolution = resolveByMode(session, userText, regexResult);
DialogueActResult resolved = resolution.resolved();
// 3. Persist regex, LLM candidate, guard metadata, and final decision
session.putInputParam(ConvEngineInputParamKey.DIALOGUE_ACT_REGEX, regexResult.act().name());
session.putInputParam(ConvEngineInputParamKey.DIALOGUE_ACT_LLM_CANDIDATE, ...);
session.putInputParam(ConvEngineInputParamKey.DIALOGUE_ACT, resolved.act().name());
session.putInputParam(ConvEngineInputParamKey.DIALOGUE_ACT_CONFIDENCE, resolved.confidence());
if (resolved.standaloneQuery() != null) {
session.putInputParam(ConvEngineInputParamKey.STANDALONE_QUERY, resolved.standaloneQuery());
session.setStandaloneQuery(resolved.standaloneQuery());
}
// 4. Let DB rules override the guarded result before policy routing
rulesStep.applyRules(session, "DialogueActStep", RulePhase.POST_DIALOGUE_ACT.name());
return new StepResult.Continue();
}
}
Responsibility: Decide runtime policy before intent
Session Mutations: policy_decision, skip_intent_resolution
Config/Table Dependencies: ce_config, session pending state
Detailed Execution Logic
Uses the identified DialogueAct to decide how the engine should route the turn. This step prevents the system from confusing follow-up answers (like saying "yes") with new intents.
Before this step runs, the engine now executes a POST_DIALOGUE_ACT rule pass. That lets DB rules override guarded dialogue-act outcomes using the regex result, the LLM candidate result, or the dedicated SET_DIALOGUE_ACT action without hardcoding state names in Java.
The output maps to the InteractionPolicyDecision enum:
EXECUTE_PENDING_ACTION: If the DialogueAct isAFFIRMand there's a background API task waiting.REJECT_PENDING_ACTION: IfNEGATEand an action is waiting.FILL_PENDING_SLOT: If the user is currently answering a schema extraction question.RECLASSIFY_INTENT: If this is aNEW_REQUEST.
Evaluation Matrix Flow:
- Checks for context hints:
hasPendingAction,hasPendingSlot,hasResolvedIntent,hasResolvedState. - First, it attempts a lookup using the
resolveFromMatrix()method against any custom configurations in your YAML properties. - If no custom matrix decision applies, it checks boolean flags from the config.
isExecutePendingOnAffirm(): If true,AFFIRM+hasPendingAction->EXECUTE_PENDING_ACTIONisRejectPendingOnNegate(): If true,NEGATE+hasPendingAction->REJECT_PENDING_ACTIONisFillPendingSlotOnNonNewRequest(): If true,!= NEW_REQUEST && != GREETING+hasPendingSlot->FILL_PENDING_SLOT
- Most crucially, if any of these policy decisions trigger, it sets
skipIntentResolution = true. This stops Step 11 (IntentResolutionStep) from overriding the sticky intent. The payload is attached withPOLICY_DECISION.
public StepResult execute(EngineSession session) {
String dialogueActRaw = session.inputParamAsString(ConvEngineInputParamKey.DIALOGUE_ACT);
DialogueAct dialogueAct = parseDialogueAct(dialogueActRaw);
Map<String, Object> context = session.contextDict();
Map<String, Object> inputParams = session.getInputParams();
boolean hasPendingAction = hasValue(context.get("pending_action"))
|| hasValue(context.get("pendingAction"))
|| hasValue(inputParams.get("pending_action"))
|| hasValue(inputParams.get("pendingAction"))
|| hasValue(inputParams.get(ConvEngineInputParamKey.PENDING_ACTION_KEY))
|| hasValue(inputParams.get("pending_action_task"))
|| hasPendingActionFromRegistry(session);
boolean hasPendingSlot = hasValue(context.get("pending_slot"))
|| hasValue(context.get("pendingSlot"));
boolean hasResolvedIntent = session.getIntent() != null
&& !session.getIntent().isBlank()
&& !"UNKNOWN".equalsIgnoreCase(session.getIntent());
boolean hasResolvedState = session.getState() != null
&& !session.getState().isBlank()
&& !"UNKNOWN".equalsIgnoreCase(session.getState());
boolean requireResolvedIntentAndState = flowConfig.getInteractionPolicy().isRequireResolvedIntentAndState();
boolean hasResolvedContext = !requireResolvedIntentAndState || (hasResolvedIntent && hasResolvedState);
InteractionPolicyDecision decision = InteractionPolicyDecision.RECLASSIFY_INTENT;
boolean skipIntentResolution = false;
if (hasResolvedContext) {
InteractionPolicyDecision matrixDecision = resolveFromMatrix(hasPendingAction, hasPendingSlot, dialogueAct);
if (matrixDecision != null) {
decision = matrixDecision;
skipIntentResolution = true;
} else if (flowConfig.getInteractionPolicy().isExecutePendingOnAffirm()
&& hasPendingAction
&& dialogueAct == DialogueAct.AFFIRM) {
decision = InteractionPolicyDecision.EXECUTE_PENDING_ACTION;
skipIntentResolution = true;
} else if (flowConfig.getInteractionPolicy().isRejectPendingOnNegate()
&& hasPendingAction
&& dialogueAct == DialogueAct.NEGATE) {
decision = InteractionPolicyDecision.REJECT_PENDING_ACTION;
skipIntentResolution = true;
} else if (flowConfig.getInteractionPolicy().isFillPendingSlotOnNonNewRequest()
&& hasPendingSlot
&& dialogueAct != DialogueAct.NEW_REQUEST) {
decision = InteractionPolicyDecision.FILL_PENDING_SLOT;
skipIntentResolution = true;
}
}
session.putInputParam(ConvEngineInputParamKey.POLICY_DECISION, decision.name());
session.putInputParam(ConvEngineInputParamKey.SKIP_INTENT_RESOLUTION, skipIntentResolution);
Map<String, Object> payload = new LinkedHashMap<>();
payload.put(ConvEnginePayloadKey.DIALOGUE_ACT, dialogueAct.name());
payload.put(ConvEnginePayloadKey.POLICY_DECISION, decision.name());
payload.put(ConvEnginePayloadKey.SKIP_INTENT_RESOLUTION, skipIntentResolution);
payload.put("hasPendingAction", hasPendingAction);
payload.put("hasPendingSlot", hasPendingSlot);
payload.put(ConvEnginePayloadKey.INTENT, session.getIntent());
payload.put(ConvEnginePayloadKey.STATE, session.getState());
audit.audit(ConvEngineAuditStage.INTERACTION_POLICY_DECIDED, session.getConversationId(), payload);
return new StepResult.Continue();
}
Responsibility: Maintain pending action runtime TTL/status
Session Mutations: pending_action_runtime context
Config/Table Dependencies: ce_pending_action, ce_audit
Detailed Execution Logic
Tracks time-to-live (TTL) for CePendingAction rows. If the user was asked "Are you sure you want to cancel?" 3 turns ago, but started talking about the weather instead, this step will mark the pending_action_runtime as EXPIRED.
Status transitions (Enum PendingActionStatus):
OPEN: Task is created but waiting for user confirmation.IN_PROGRESS: The user affirmed, and the task is ready to execute.REJECTED: The user negated.EXPIRED: The TTL turn limit was reached before the user confirmed.
public StepResult execute(EngineSession session) {
if (!flowConfig.getActionLifecycle().isEnabled()) {
return new StepResult.Continue();
}
ObjectNode root = contextHelper.readRoot(session);
ObjectNode runtime = contextHelper.ensureObject(root, RUNTIME_NODE);
int currentTurn = session.conversionHistory().size() + 1;
long now = Instant.now().toEpochMilli();
PendingActionStatus currentStatus = PendingActionStatus.from(runtime.path("status").asText(null), null);
if (isExpired(runtime, currentTurn, now) && (currentStatus == PendingActionStatus.OPEN || currentStatus == PendingActionStatus.IN_PROGRESS)) {
runtime.put("status", PendingActionStatus.EXPIRED.name());
runtime.put("expired_turn", currentTurn);
runtime.put("expired_at_epoch_ms", now);
session.putInputParam(ConvEngineInputParamKey.PENDING_ACTION_RUNTIME_STATUS, PendingActionStatus.EXPIRED.name());
audit.audit(ConvEngineAuditStage.PENDING_ACTION_LIFECYCLE, session.getConversationId(), mapOf(
"event", "EXPIRED",
"status", PendingActionStatus.EXPIRED.name(),
"turn", currentTurn
));
}
String actionKey = resolveActionKey(session);
String actionRef = resolveActionReferenceFromTable(session, actionKey);
if (actionRef == null || actionRef.isBlank()) {
contextHelper.writeRoot(session, root);
return new StepResult.Continue();
}
boolean isNewRuntime = isRuntimeNew(runtime, actionKey, actionRef);
if (isNewRuntime) {
runtime.put("action_key", actionKey == null ? "" : actionKey);
runtime.put("action_ref", actionRef);
runtime.put("status", PendingActionStatus.OPEN.name());
runtime.put("created_turn", currentTurn);
runtime.put("created_at_epoch_ms", now);
runtime.put("expires_turn", flowConfig.getActionLifecycle().getTtlTurns() > 0
? currentTurn + flowConfig.getActionLifecycle().getTtlTurns()
: -1);
runtime.put("expires_at_epoch_ms", flowConfig.getActionLifecycle().getTtlMinutes() > 0
? now + (flowConfig.getActionLifecycle().getTtlMinutes() * 60_000L)
: -1);
session.putInputParam(ConvEngineInputParamKey.PENDING_ACTION_RUNTIME_STATUS, PendingActionStatus.OPEN.name());
audit.audit(ConvEngineAuditStage.PENDING_ACTION_LIFECYCLE, session.getConversationId(), mapOf(
"event", "OPEN",
"status", PendingActionStatus.OPEN.name(),
"actionKey", actionKey,
"actionRef", actionRef
));
}
InteractionPolicyDecision decision = parseDecision(session.inputParamAsString(ConvEngineInputParamKey.POLICY_DECISION));
if (decision == InteractionPolicyDecision.EXECUTE_PENDING_ACTION) {
runtime.put("status", PendingActionStatus.IN_PROGRESS.name());
runtime.put("in_progress_turn", currentTurn);
runtime.put("in_progress_at_epoch_ms", now);
session.putInputParam(ConvEngineInputParamKey.PENDING_ACTION_RUNTIME_STATUS, PendingActionStatus.IN_PROGRESS.name());
audit.audit(ConvEngineAuditStage.PENDING_ACTION_LIFECYCLE, session.getConversationId(), mapOf(
"event", "IN_PROGRESS",
"status", PendingActionStatus.IN_PROGRESS.name(),
"actionKey", actionKey,
"actionRef", actionRef
));
} else if (decision == InteractionPolicyDecision.REJECT_PENDING_ACTION) {
runtime.put("status", PendingActionStatus.REJECTED.name());
runtime.put("rejected_turn", currentTurn);
runtime.put("rejected_at_epoch_ms", now);
session.putInputParam(ConvEngineInputParamKey.PENDING_ACTION_RUNTIME_STATUS, PendingActionStatus.REJECTED.name());
audit.audit(ConvEngineAuditStage.PENDING_ACTION_LIFECYCLE, session.getConversationId(), mapOf(
"event", "REJECTED",
"status", PendingActionStatus.REJECTED.name(),
"actionKey", actionKey,
"actionRef", actionRef
));
}
contextHelper.writeRoot(session, root);
return new StepResult.Continue();
}
Responsibility: Ask question when multiple actions fit
Session Mutations: pending_clarification question/context
Config/Table Dependencies: ce_pending_action, ce_config, ce_audit
Detailed Execution Logic
A smart conversational router. If multiple pending actions apply to the current context (e.g., "Cancel flight" vs "Cancel hotel" both valid), it pauses the pipeline.
It dynamically builds a multiple-choice prompt (or LLM synthesis) asking the user to clarify which action they meant. It emits an ASSISTANT_OUTPUT step, stalling the pipeline until the user clarifies.
public StepResult execute(EngineSession session) {
if (!flowConfig.getDisambiguation().isEnabled()) {
return new StepResult.Continue();
}
InteractionPolicyDecision decision = parseDecision(session.inputParamAsString(ConvEngineInputParamKey.POLICY_DECISION));
if (decision != InteractionPolicyDecision.EXECUTE_PENDING_ACTION) {
return new StepResult.Continue();
}
String explicitActionKey = session.inputParamAsString(ConvEngineInputParamKey.PENDING_ACTION_KEY);
if (explicitActionKey != null && !explicitActionKey.isBlank()) {
return new StepResult.Continue();
}
List<CePendingAction> candidates = pendingActionRepository.findEligibleByIntentAndStateOrderByPriorityAsc(
session.getIntent(),
session.getState()
);
if (candidates == null || candidates.size() <= 1) {
return new StepResult.Continue();
}
int bestPriority = candidates.getFirst().getPriority() == null ? Integer.MAX_VALUE : candidates.getFirst().getPriority();
List<CePendingAction> top = candidates.stream()
.filter(c -> (c.getPriority() == null ? Integer.MAX_VALUE : c.getPriority()) == bestPriority)
.toList();
if (top.size() <= 1) {
return new StepResult.Continue();
}
Set<String> options = new LinkedHashSet<>();
for (CePendingAction row : top) {
if (row.getActionKey() == null || row.getActionKey().isBlank()) {
continue;
}
String option = row.getActionKey().trim();
if (row.getDescription() != null && !row.getDescription().isBlank()) {
option = option + " (" + row.getDescription().trim() + ")";
}
options.add(option);
if (options.size() >= Math.max(1, flowConfig.getDisambiguation().getMaxOptions())) {
break;
}
}
if (options.isEmpty()) {
return new StepResult.Continue();
}
QuestionResult questionResult = buildQuestion(session, top, options);
String question = questionResult.question();
session.setPendingClarificationQuestion(question);
session.setPendingClarificationReason("PENDING_ACTION_DISAMBIGUATION");
session.putInputParam(ConvEngineInputParamKey.POLICY_DECISION, InteractionPolicyDecision.RECLASSIFY_INTENT.name());
session.putInputParam(ConvEngineInputParamKey.PENDING_ACTION_DISAMBIGUATION_REQUIRED, true);
Map<String, Object> payload = new LinkedHashMap<>();
payload.put(ConvEnginePayloadKey.REASON, "MULTIPLE_PENDING_ACTIONS");
payload.put(ConvEnginePayloadKey.QUESTION, question);
payload.put(ConvEnginePayloadKey.CANDIDATE_COUNT, top.size());
payload.put(ConvEnginePayloadKey.OPTIONS, new ArrayList<>(options));
payload.put(ConvEnginePayloadKey.INTENT, session.getIntent());
payload.put(ConvEnginePayloadKey.STATE, session.getState());
payload.put(ConvEnginePayloadKey.QUESTION_SOURCE, questionResult.source());
audit.audit(ConvEngineAuditStage.DISAMBIGUATION_REQUIRED, session.getConversationId(), payload);
return new StepResult.Continue();
}
Responsibility: Apply guardrails and approval rules
Session Mutations: guardrail flags/sanitized text
Config/Table Dependencies: ce_config, ce_audit
Detailed Execution Logic
The last line of defense before intent triggers. Reads the ce_config guardrail thresholds and sanitize instructions.
If a command is flagged as "sensitive" (e.g., destructive actions like closing an account), it can force an explicit SENSITIVE_ACTION_APPROVAL_REQUIRED pause, blocking the pipeline from executing tasks until MFA or explicit user verification is acquired.
public StepResult execute(EngineSession session) {
if (!flowConfig.getGuardrail().isEnabled()) {
return new StepResult.Continue();
}
String originalUserText = session.getUserText() == null ? "" : session.getUserText();
String sanitizedUserText = sanitize(originalUserText);
if (flowConfig.getGuardrail().isSanitizeInput()) {
session.putInputParam(ConvEngineInputParamKey.SANITIZED_USER_TEXT, sanitizedUserText);
}
boolean sensitive = matchesSensitivePattern(sanitizedUserText);
boolean approvalRequired = flowConfig.getGuardrail().isRequireApprovalForSensitiveActions() && sensitive;
boolean approvalGranted = isApprovalGranted(session);
boolean failClosed = flowConfig.getGuardrail().isApprovalGateFailClosed();
boolean denied = approvalRequired && (!approvalGranted || failClosed && !approvalGranted);
if (denied) {
session.putInputParam(ConvEngineInputParamKey.GUARDRAIL_BLOCKED, true);
session.putInputParam(ConvEngineInputParamKey.GUARDRAIL_REASON, "SENSITIVE_ACTION_APPROVAL_REQUIRED");
session.putInputParam(ConvEngineInputParamKey.POLICY_DECISION, InteractionPolicyDecision.RECLASSIFY_INTENT.name());
session.putInputParam(ConvEngineInputParamKey.SKIP_TOOL_EXECUTION, true);
session.putInputParam(ConvEngineInputParamKey.SKIP_PENDING_ACTION_EXECUTION, true);
Map<String, Object> payload = new LinkedHashMap<>();
payload.put("result", "DENY");
payload.put("reason", "SENSITIVE_ACTION_APPROVAL_REQUIRED");
payload.put("sensitive", true);
payload.put("approvalGranted", approvalGranted);
payload.put("userText", sanitizedUserText);
payload.put("intent", session.getIntent());
payload.put("state", session.getState());
audit.audit(ConvEngineAuditStage.GUARDRAIL_DENY, session.getConversationId(), payload);
return new StepResult.Continue();
}
session.putInputParam(ConvEngineInputParamKey.GUARDRAIL_BLOCKED, false);
Map<String, Object> payload = new LinkedHashMap<>();
payload.put("result", "ALLOW");
payload.put("sensitive", sensitive);
payload.put("approvalRequired", approvalRequired);
payload.put("approvalGranted", approvalGranted);
payload.put("intent", session.getIntent());
payload.put("state", session.getState());
audit.audit(ConvEngineAuditStage.GUARDRAIL_ALLOW, session.getConversationId(), payload);
return new StepResult.Continue();
}
Responsibility: Resolve intent with classifier+agent
Session Mutations: intent/state/clarification fields
Config/Table Dependencies: ce_intent, ce_intent_classifier, ce_config
Detailed Execution Logic
The primary intent matching gateway. Uses the CompositeIntentResolver (which merges Regex, Semantic Search, and LLM classifiers based off ce_intent_classifier).
If the interaction policy decided we are in FILL_PENDING_SLOT mode, this step is bypassed entirely (referred to as a "Locked Intent").
Otherwise:
- Queries
ce_intent_classifierfor matches. - Uses
INTENT_RESOLVEDaudit logs to map theintentCode. - Sets the context state to
IDLE(or whatever the initial configuration demands).
public StepResult execute(EngineSession session) {
String previousIntent = session.getIntent();
Map<String, Object> startPayload = new LinkedHashMap<>();
startPayload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.PREVIOUS_INTENT, previousIntent);
startPayload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.INTENT_LOCKED, session.isIntentLocked());
startPayload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.INTENT_LOCK_REASON, session.getIntentLockReason());
audit.audit(ConvEngineAuditStage.INTENT_RESOLVE_START, session.getConversationId(), startPayload);
if (session.isIntentLocked() || isActiveSchemaCollection(session)) {
if (!session.isIntentLocked()) {
session.lockIntent("SCHEMA_INCOMPLETE");
}
session.clearClarification();
if (session.getConversation() != null) {
session.getConversation().setIntentCode(session.getIntent());
session.getConversation().setStateCode(session.getState());
}
Map<String, Object> payload = new LinkedHashMap<>();
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.INTENT, session.getIntent());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.STATE, session.getState());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.INTENT_LOCKED, session.isIntentLocked());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.INTENT_LOCK_REASON, session.getIntentLockReason());
audit.audit(ConvEngineAuditStage.INTENT_RESOLVE_SKIPPED_SCHEMA_COLLECTION, session.getConversationId(), payload);
return new StepResult.Continue();
}
if (shouldSkipResolutionForPolicy(session)) {
if (session.getConversation() != null) {
session.getConversation().setIntentCode(session.getIntent());
session.getConversation().setStateCode(session.getState());
}
Map<String, Object> payload = new LinkedHashMap<>();
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.INTENT, session.getIntent());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.STATE, session.getState());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.DIALOGUE_ACT, session.inputParamAsString(ConvEngineInputParamKey.DIALOGUE_ACT));
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.POLICY_DECISION, session.inputParamAsString(ConvEngineInputParamKey.POLICY_DECISION));
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.SKIP_INTENT_RESOLUTION, true);
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.REASON, "policy decision retained existing intent/state");
audit.audit(ConvEngineAuditStage.INTENT_RESOLVE_SKIPPED_POLICY, session.getConversationId(), payload);
return new StepResult.Continue();
}
if (shouldSkipResolutionForStickyIntent(session)) {
if (session.getConversation() != null) {
session.getConversation().setIntentCode(session.getIntent());
session.getConversation().setStateCode(session.getState());
}
Map<String, Object> payload = existingIntentRetainedAuditPayload(session);
audit.audit(ConvEngineAuditStage.INTENT_RESOLVE_SKIPPED_STICKY_INTENT, session.getConversationId(), payload);
return new StepResult.Continue();
}
CompositeIntentResolver.IntentResolutionResult result = intentResolver.resolveWithTrace(session);
if (result == null || result.resolvedIntent() == null) {
audit.audit(ConvEngineAuditStage.INTENT_RESOLVE_NO_CHANGE, session.getConversationId(), Map.of());
return new StepResult.Continue();
}
if (!result.resolvedIntent().equals(previousIntent)) {
session.setIntent(result.resolvedIntent());
}
session.getConversation().setIntentCode(session.getIntent());
session.getConversation().setStateCode(session.getState());
audit.audit(
ConvEngineAuditStage.intentResolvedBy(result.source().name()),
session.getConversationId(),
result
);
return new StepResult.Continue();
}
Responsibility: Reset on configured reset intent
Session Mutations: full reset
Config/Table Dependencies: ce_config RESET_INTENT_CODES
Detailed Execution Logic
A quality of life check. If the resolved intent matches one of the RESET_INTENT_CODES configured in Spring configuration (e.g. START_OVER, RESET), this step immediately executes a session wipe akin to ResetConversationStep, returning the conversation to a clean slate.
public StepResult execute(EngineSession session) {
String intent = session.getIntent();
if (intent == null || !resetIntentCodes.contains(intent.trim().toUpperCase())) {
return new StepResult.Continue();
}
session.resetForConversationRestart();
session.getConversation().setStatus("RUNNING");
session.getConversation().setIntentCode("UNKNOWN");
session.getConversation().setStateCode("UNKNOWN");
session.getConversation().setContextJson("{}");
session.getConversation().setInputParamsJson("{}");
session.getConversation().setLastAssistantJson(null);
session.getConversation().setUpdatedAt(OffsetDateTime.now());
conversationRepository.save(session.getConversation());
Map<String, Object> payload = new LinkedHashMap<>();
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.REASON, "INTENT_RESOLVED_RESET");
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.MATCHED_RESET_INTENT_CODES, resetIntentCodes);
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.INTENT, session.getIntent());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.STATE, session.getState());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.CONTEXT, session.getContextJson());
audit.audit(ConvEngineAuditStage.CONVERSATION_RESET, session.getConversationId(), payload);
return new StepResult.Continue();
}
Responsibility: Fill missing intent/state defaults
Session Mutations: intent/state
Config/Table Dependencies: none
Detailed Execution Logic
A safety net. If the classifier fails to return any confidence, or an exception occurred, this step forcibly binds the native engine defaults to UNKNOWN intent and UNKNOWN state so that ce_rule and ce_response tables can still define fallback messaging (e.g., "I didn't understand that").
public StepResult execute(EngineSession session) {
if (session.getIntent() == null) {
Map<String, Object> payload = new LinkedHashMap<>();
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.INTENT, session.getIntent());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.STATE, session.getState());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.CONTEXT, session.contextDict());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.USER_TEXT, session.getUserText());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.FALLBACK_INTENT, fallbackIntent);
audit.audit(ConvEngineAuditStage.INTENT_MISSING, session.getConversationId(), payload);
session.setIntent(fallbackIntent);
}
if (session.getState() == null) {
Map<String, Object> payload = new LinkedHashMap<>();
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.INTENT, session.getIntent());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.STATE, session.getState());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.CONTEXT, session.contextDict());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.USER_TEXT, session.getUserText());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.FALLBACK_STATE, fallbackState);
audit.audit(ConvEngineAuditStage.STATE_MISSING, session.getConversationId(), payload);
session.setState(fallbackState);
}
session.getConversation().setIntentCode(session.getIntent());
session.getConversation().setStateCode(session.getState());
return new StepResult.Continue();
}
Responsibility: Fetch and attach container data
Session Mutations: containerData/context merge
Config/Table Dependencies: ce_container_config
Detailed Execution Logic
Bridges static tenant/consumer configurations. Evaluates ce_container_config to pull any global JSON context relevant to the intent and merges it directly into session.contextJson. This allows things like "Store Hours" or "Region Policies" to be globally attached to all LLM contexts without hardcoding.
public StepResult execute(EngineSession session) {
List<CeContainerConfig> configs =
containerConfigRepo.findByIntentAndState(
session.getIntent(),
session.getState()
);
if (configs.isEmpty()) {
configs = containerConfigRepo.findFallbackByState(session.getState());
}
if (configs.isEmpty()) {
configs = containerConfigRepo.findGlobalFallback();
}
if (configs.isEmpty()) {
Map<String, Object> reasonMap = new HashMap<>();
reasonMap.put("reason", "no container configs for intent/state");
reasonMap.put("intent", session.getIntent());
reasonMap.put("state", session.getState());
audit.audit(
"CONTAINER_DATA_SKIPPED",
session.getConversationId(),
reasonMap
);
return new StepResult.Continue();
}
ObjectNode containerRoot = mapper.createObjectNode();
for (CeContainerConfig cfg : configs) {
try {
Map<String, Object> inputParams = new HashMap<>();
String key = cfg.getInputParamName();
Object value = session.extractValueFromContext(key);
if(value == null) {
value = session.getUserText();
}
inputParams.put(key, value);
if (session.getInputParams() != null) {
inputParams.putAll(session.getInputParams());
}
if (session.getEngineContext().getInputParams() != null) {
inputParams.putAll(session.getEngineContext().getInputParams());
}
PageInfoRequest pageInfo = PageInfoRequest.builder()
.userId("convengine")
.loggedInUserId("convengine")
.pageId(cfg.getPageId())
.sectionId(cfg.getSectionId())
.containerId(cfg.getContainerId())
.inputParams(inputParams)
.build();
ContainerComponentRequest req = new ContainerComponentRequest();
req.setPageInfo(List.of(pageInfo));
req.setRequestTypes(List.of(RequestType.CONTAINER));
interceptorExecutor.beforeExecute(req, session);
ContainerComponentResponse resp = ccfCoreService.execute(req);
resp = interceptorExecutor.afterExecute(resp, session);
// find classes with @ContainerDataTransformer(state, intent) to transform resp if needed
Map<String, Object> transformedData = transformerService.transformIfApplicable(resp, session, inputParams);
JsonNode responseNode = transformedData == null ? mapper.valueToTree(resp) : mapper.valueToTree(transformedData);
session.setContainerData(responseNode);
containerRoot.set(cfg.getInputParamName(), responseNode);
Map<String, Object> jsonMap = Map.of(
"containerId", cfg.getContainerId(),
"pageId", cfg.getPageId(),
"sectionId", cfg.getSectionId(),
"inputParam", cfg.getInputParamName(),
"requestInput", inputParams,
"response", responseNode
);
audit.audit(
"CONTAINER_DATA_EXECUTED",
session.getConversationId(),
jsonMap
);
} catch (Exception e) {
Map<String, Object> errorJsonMap = new HashMap<>();
errorJsonMap.put("containerId", cfg.getContainerId());
errorJsonMap.put("error", e.getMessage());
audit.audit(
"CONTAINER_DATA_FAILED",
session.getConversationId(),
errorJsonMap
);
}
}
if (!containerRoot.isEmpty()) {
// attach to session
session.setContainerDataJson(containerRoot.toString());
session.setHasContainerData(true);
// merge into conversation context
try {
ObjectNode ctx = (ObjectNode) mapper.readTree(session.getContextJson());
ctx.set("container_data", containerRoot);
session.setContextJson(mapper.writeValueAsString(ctx));
session.getConversation().setContextJson(session.getContextJson());
} catch (Exception ignore) {
// context merge failure should not break pipeline
}
audit.audit(
"CONTAINER_DATA_ATTACHED",
session.getConversationId(),
containerRoot.toString()
);
}
return new StepResult.Continue();
}
Responsibility: Execute/reject pending action task
Session Mutations: pending_action_runtime status/result
Config/Table Dependencies: ce_pending_action, CeTaskExecutor, ce_audit
Detailed Execution Logic
Executes Java code. If the InteractionPolicy is EXECUTE_PENDING_ACTION and the status is IN_PROGRESS, this step resolves the Spring Bean ID attached to the ce_pending_action row.
It invokes CeTaskExecutor.execute(), runs the backend transaction (e.g. Stripe Refund), and captures the boolean/json result back into the engine EngineSession context for downstream rules to evaluate.
public StepResult execute(EngineSession session) {
if (Boolean.TRUE.equals(session.getInputParams().get(ConvEngineInputParamKey.SKIP_PENDING_ACTION_EXECUTION))
|| Boolean.TRUE.equals(session.getInputParams().get(ConvEngineInputParamKey.GUARDRAIL_BLOCKED))) {
Map<String, Object> payload = basePayload(session, InteractionPolicyDecision.RECLASSIFY_INTENT, null);
payload.put(ConvEnginePayloadKey.REASON, "pending action skipped by guardrail");
audit.audit(ConvEngineAuditStage.PENDING_ACTION_SKIPPED, session.getConversationId(), payload);
return new StepResult.Continue();
}
String decisionRaw = session.inputParamAsString(ConvEngineInputParamKey.POLICY_DECISION);
InteractionPolicyDecision decision = parseDecision(decisionRaw);
if (decision != InteractionPolicyDecision.EXECUTE_PENDING_ACTION
&& decision != InteractionPolicyDecision.REJECT_PENDING_ACTION) {
return new StepResult.Continue();
}
Map<String, Object> context = session.contextDict();
Object pendingAction = context.get("pending_action");
if (pendingAction == null) {
pendingAction = context.get("pendingAction");
}
String actionKey = resolveActionKey(session, context, pendingAction);
String actionRef = resolveActionReference(session, pendingAction, actionKey);
if (actionRef == null || actionRef.isBlank()) {
Map<String, Object> payload = basePayload(session, decision, null);
payload.put("actionKey", actionKey);
payload.put(ConvEnginePayloadKey.REASON, actionKey == null || actionKey.isBlank()
? "pending action reference not found or ambiguous registry mapping"
: "pending action reference not found");
audit.audit(ConvEngineAuditStage.PENDING_ACTION_SKIPPED, session.getConversationId(), payload);
return new StepResult.Continue();
}
if (decision == InteractionPolicyDecision.REJECT_PENDING_ACTION) {
session.putInputParam(ConvEngineInputParamKey.PENDING_ACTION_RESULT, "REJECTED");
updateRuntimeStatus(session, PendingActionStatus.REJECTED);
Map<String, Object> payload = basePayload(session, decision, actionRef);
payload.put(ConvEnginePayloadKey.PENDING_ACTION_RESULT, "REJECTED");
audit.audit(ConvEngineAuditStage.PENDING_ACTION_REJECTED, session.getConversationId(), payload);
return new StepResult.Continue();
}
String[] taskRef = parseTaskReference(actionRef);
if (taskRef == null) {
session.putInputParam(ConvEngineInputParamKey.PENDING_ACTION_RESULT, "FAILED");
Map<String, Object> payload = basePayload(session, decision, actionRef);
payload.put(ConvEnginePayloadKey.PENDING_ACTION_RESULT, "FAILED");
payload.put(ConvEnginePayloadKey.REASON, "invalid pending action reference");
audit.audit(ConvEngineAuditStage.PENDING_ACTION_FAILED, session.getConversationId(), payload);
return new StepResult.Continue();
}
Object executionResult = ceTaskExecutor.execute(taskRef[0], taskRef[1], session);
if (executionResult == null) {
session.putInputParam(ConvEngineInputParamKey.PENDING_ACTION_RESULT, "FAILED");
Map<String, Object> payload = basePayload(session, decision, actionRef);
payload.put(ConvEnginePayloadKey.PENDING_ACTION_RESULT, "FAILED");
payload.put(ConvEnginePayloadKey.REASON, "task execution returned null");
audit.audit(ConvEngineAuditStage.PENDING_ACTION_FAILED, session.getConversationId(), payload);
return new StepResult.Continue();
}
session.putInputParam(ConvEngineInputParamKey.PENDING_ACTION_RESULT, "EXECUTED");
updateRuntimeStatus(session, PendingActionStatus.EXECUTED);
Map<String, Object> payload = basePayload(session, decision, actionRef);
payload.put(ConvEnginePayloadKey.PENDING_ACTION_RESULT, "EXECUTED");
payload.put("taskBean", taskRef[0]);
payload.put("taskMethods", taskRef[1]);
audit.audit(ConvEngineAuditStage.PENDING_ACTION_EXECUTED, session.getConversationId(), payload);
return new StepResult.Continue();
}
Responsibility: Run tool_group based orchestration
Session Mutations: tool_request/tool_result fields
Config/Table Dependencies: ce_tool, ce_mcp_tool, ce_audit
Detailed Execution Logic
The gateway for Model Context Protocol (MCP) tooling. If ce_tool specifies that this intent requires a tool_group, this step binds the request and delegates to an external executor. It pauses the LLM, executes the backend SQL or REST fetch, and dumps the massive JSON result into tool_result dictionary in context.
public StepResult execute(EngineSession session) {
if (!flowConfig.getToolOrchestration().isEnabled()) {
return new StepResult.Continue();
}
if (Boolean.TRUE.equals(session.getInputParams().get(ConvEngineInputParamKey.SKIP_TOOL_EXECUTION))) {
return new StepResult.Continue();
}
ToolRequest request = resolveRequest(session);
if (request == null) {
return new StepResult.Continue();
}
session.putInputParam(ConvEngineInputParamKey.TOOL_REQUEST, request.toMap());
audit.audit(ConvEngineAuditStage.TOOL_ORCHESTRATION_REQUEST, session.getConversationId(), request.toMap());
try {
CeMcpTool tool = request.toolCode() == null || request.toolCode().isBlank()
? null
: registry.requireTool(request.toolCode(), session.getIntent(), session.getState());
String group = request.toolGroup();
if ((group == null || group.isBlank()) && tool != null) {
group = registry.normalizeToolGroup(tool.getToolGroup());
}
if (group == null || group.isBlank()) {
throw new IllegalStateException("tool_group is required when tool_code is not resolvable");
}
McpToolExecutor executor = resolveExecutor(group);
String resultJson = executor.execute(tool, request.args(), session);
Map<String, Object> result = new LinkedHashMap<>();
result.put("status", "SUCCESS");
result.put("tool_code", request.toolCode());
result.put("tool_group", group);
result.put("result", parseJsonOrString(resultJson));
session.putInputParam(ConvEngineInputParamKey.TOOL_RESULT, result);
session.putInputParam(ConvEngineInputParamKey.TOOL_STATUS, "SUCCESS");
audit.audit(ConvEngineAuditStage.TOOL_ORCHESTRATION_RESULT, session.getConversationId(), result);
rulesStep.applyRules(session, "ToolOrchestrationStep PostTool", RulePhase.POST_TOOL_EXECUTION.name());
} catch (IllegalStateException e) {
if (e.getMessage() != null && e.getMessage().contains("Missing enabled MCP tool for current intent/state")) {
Map<String, Object> result = new LinkedHashMap<>();
result.put("status", "SKIPPED_SCOPE_MISMATCH");
result.put("tool_code", request.toolCode());
result.put("tool_group", request.toolGroup());
result.put("intent", session.getIntent());
result.put("state", session.getState());
session.putInputParam(ConvEngineInputParamKey.TOOL_RESULT, result);
session.putInputParam(ConvEngineInputParamKey.TOOL_STATUS, "SKIPPED_SCOPE_MISMATCH");
audit.audit(ConvEngineAuditStage.TOOL_ORCHESTRATION_RESULT, session.getConversationId(), result);
return new StepResult.Continue();
}
Map<String, Object> result = new LinkedHashMap<>();
result.put("status", "ERROR");
result.put("tool_code", request.toolCode());
result.put("tool_group", request.toolGroup());
result.put("error", String.valueOf(e.getMessage()));
session.putInputParam(ConvEngineInputParamKey.TOOL_RESULT, result);
session.putInputParam(ConvEngineInputParamKey.TOOL_STATUS, "ERROR");
audit.audit(ConvEngineAuditStage.TOOL_ORCHESTRATION_ERROR, session.getConversationId(), result);
} catch (Exception e) {
Map<String, Object> result = new LinkedHashMap<>();
result.put("status", "ERROR");
result.put("tool_code", request.toolCode());
result.put("tool_group", request.toolGroup());
result.put("error", String.valueOf(e.getMessage()));
session.putInputParam(ConvEngineInputParamKey.TOOL_RESULT, result);
session.putInputParam(ConvEngineInputParamKey.TOOL_STATUS, "ERROR");
audit.audit(ConvEngineAuditStage.TOOL_ORCHESTRATION_ERROR, session.getConversationId(), result);
}
return new StepResult.Continue();
}
Responsibility: MCP planner/tool loop
Session Mutations: context_json.mcp.*
Config/Table Dependencies: ce_mcp_tool, ce_mcp_db_tool, ce_mcp_planner (fallback ce_config)
Detailed Execution Logic
Specifically iterates over ce_mcp_tool bindings. Instead of static grouped tools, this triggers an agent planner that interprets the input, selects an MCP tool, writes the payload, and executes it. This is the core of dynamic tool use in ConvEngine V2.
public StepResult execute(EngineSession session) {
if (Boolean.TRUE.equals(session.getInputParams().get(ConvEngineInputParamKey.SKIP_TOOL_EXECUTION))
|| Boolean.TRUE.equals(session.getInputParams().get(ConvEngineInputParamKey.GUARDRAIL_BLOCKED))) {
session.putInputParam(ConvEngineInputParamKey.MCP_STATUS, "SKIPPED_BY_GUARDRAIL");
return new StepResult.Continue();
}
if (session.hasPendingClarification()) {
audit.audit(
ConvEngineAuditStage.MCP_SKIPPED_PENDING_CLARIFICATION,
session.getConversationId(),
mapOf(
"intent", session.getIntent(),
"state", session.getState()
)
);
return new StepResult.Continue();
}
List<CeMcpTool> tools = registry.listEnabledTools(session.getIntent(), session.getState());
if (CollectionUtils.isEmpty(tools)) {
session.putInputParam(ConvEngineInputParamKey.MCP_STATUS, "NO_TOOLS_FOR_SCOPE");
audit.audit(ConvEngineAuditStage.MCP_NO_TOOLS_AVAILABLE, session.getConversationId(),
mapOf("intent", session.getIntent(), "state", session.getState()));
return new StepResult.Continue();
}
clearMcpContext(session);
List<McpObservation> observations = readObservationsFromContext(session);
boolean mcpTouched = false;
for (int i = 0; i < MAX_LOOPS; i++) {
McpPlan plan = planner.plan(session, tools, observations);
mcpTouched = true;
session.putInputParam(ConvEngineInputParamKey.MCP_ACTION, plan.action());
session.putInputParam(ConvEngineInputParamKey.MCP_TOOL_CODE, plan.tool_code());
session.putInputParam(ConvEngineInputParamKey.MCP_TOOL_ARGS, plan.args() == null ? Map.of() : plan.args());
if ("ANSWER".equalsIgnoreCase(plan.action())) {
// store final answer in contextJson; your ResponseResolutionStep can use it via derivation_hint
writeFinalAnswerToContext(session, plan.answer());
session.putInputParam(ConvEngineInputParamKey.MCP_FINAL_ANSWER, plan.answer() == null ? "" : plan.answer());
session.putInputParam(ConvEngineInputParamKey.MCP_STATUS, "ANSWER");
audit.audit(
ConvEngineAuditStage.MCP_FINAL_ANSWER,
session.getConversationId(),
mapOf("answer", plan.answer())
);
break;
}
if (!"CALL_TOOL".equalsIgnoreCase(plan.action())) {
writeFinalAnswerToContext(session, "I couldn't decide the next tool step safely.");
session.putInputParam(ConvEngineInputParamKey.MCP_FINAL_ANSWER, "I couldn't decide the next tool step safely.");
session.putInputParam(ConvEngineInputParamKey.MCP_STATUS, "FALLBACK");
break;
}
String toolCode = plan.tool_code();
Map<String, Object> args = (plan.args() == null) ? Map.of() : plan.args();
audit.audit(
ConvEngineAuditStage.MCP_TOOL_CALL,
session.getConversationId(),
mapOf("tool_code", toolCode, "args", args)
);
CeMcpTool tool = registry.requireTool(toolCode, session.getIntent(), session.getState());
String toolGroup = registry.normalizeToolGroup(tool.getToolGroup());
session.putInputParam(ConvEngineInputParamKey.MCP_TOOL_GROUP, toolGroup);
try {
McpToolExecutor executor = resolveExecutor(toolGroup);
String rowsJson = executor.execute(tool, args, session);
observations.add(new McpObservation(toolCode, rowsJson));
writeObservationsToContext(session, observations);
session.putInputParam(ConvEngineInputParamKey.MCP_OBSERVATIONS, observations);
session.putInputParam(ConvEngineInputParamKey.MCP_STATUS, "TOOL_RESULT");
audit.audit(
ConvEngineAuditStage.MCP_TOOL_RESULT,
session.getConversationId(),
mapOf("tool_code", toolCode, "tool_group", toolGroup, "rows", rowsJson)
);
} catch (Exception e) {
audit.audit(
ConvEngineAuditStage.MCP_TOOL_ERROR,
session.getConversationId(),
mapOf("tool_code", toolCode, "tool_group", toolGroup, "error", String.valueOf(e.getMessage()))
);
writeFinalAnswerToContext(session, "Tool execution failed safely. Can you narrow the request?");
session.putInputParam(ConvEngineInputParamKey.MCP_FINAL_ANSWER, "Tool execution failed safely. Can you narrow the request?");
session.putInputParam(ConvEngineInputParamKey.MCP_STATUS, "TOOL_ERROR");
break;
}
}
if (mcpTouched) {
rulesStep.applyRules(session, "McpToolStep", RulePhase.POST_AGENT_MCP.name());
}
session.syncToConversation();
return new StepResult.Continue();
}
Responsibility: Schema-driven extraction and lock handling
Session Mutations: schema facts/context/lock
Config/Table Dependencies: ce_output_schema, ce_prompt_template
Detailed Execution Logic
Evaluates ce_output_schema. It injects the missing required slots into an LLM extracting prompt using ce_prompt_template. The LLM returns a structured JSON map. This step merges it with session.contextJson.
Prompt-template usage details:
- selects
ce_prompt_templatewithresponse_type=SCHEMA_JSONfor the sameintent_code + state_code interaction_mode=COLLECTis the recommended semantic marker for these templatesinteraction_contractcan declareexpects:["structured_input"]so the template contract remains explicit in configuration- after merge,
POST_SCHEMA_EXTRACTIONrules can move the state or set runtime flags
It then runs missingFieldEvaluator.evaluate(). If fields are missing, it sets session.setSchemaLocked(true).
public StepResult execute(EngineSession session) {
String intent = session.getIntent();
String state = session.getState();
CeOutputSchema schema = outputSchemaRepo.findAll().stream()
.filter(s -> Boolean.TRUE.equals(s.getEnabled()))
.filter(s -> equalsIgnoreCase(s.getIntentCode(), intent))
.filter(s -> equalsIgnoreCase(s.getStateCode(), state) || equalsIgnoreCase(s.getStateCode(), "ANY"))
.min((a, b) -> Integer.compare(priorityOf(a), priorityOf(b)))
.orElse(null);
if (schema != null) {
runExtraction(session, schema);
} else {
session.unlockIntent();
session.setResolvedSchema(null);
session.setSchemaComplete(false);
session.setSchemaHasAnyValue(false);
session.setMissingRequiredFields(new ArrayList<>());
session.setMissingFieldOptions(new LinkedHashMap<>());
session.addPromptTemplateVars();
}
session.syncFromConversation(true);
return new StepResult.Continue();
}
Responsibility: Compute schema status facts
Session Mutations: schemaComplete/hasAny
Config/Table Dependencies: resolved schema + context
Detailed Execution Logic
In V1, rules had to manually check if schema extraction was done. In V2, this step computes the boolean flags schemaComplete and hasAny and binds them to the session context. This allows ce_rule to simply trigger on schemaComplete == true.
public StepResult execute(EngineSession session) {
if (session.getResolvedSchema() == null) {
Map<String, Object> payload = new LinkedHashMap<>();
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.REASON, "no schema resolved");
audit.audit(ConvEngineAuditStage.AUTO_ADVANCE_SKIPPED_NO_SCHEMA, session.getConversationId(), payload);
return new StepResult.Continue();
}
String schemaJson = session.getResolvedSchema().getJsonSchema();
String contextJson = session.getContextJson();
boolean hasAnySchemaValue = JsonUtil.hasAnySchemaValue(contextJson, schemaJson);
boolean schemaComplete = JsonUtil.isSchemaComplete(schemaJson, contextJson);
session.setSchemaHasAnyValue(hasAnySchemaValue);
session.setSchemaComplete(schemaComplete);
Map<String, Object> payload = new LinkedHashMap<>();
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.SCHEMA_COMPLETE, schemaComplete);
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.HAS_ANY_SCHEMA_VALUE, hasAnySchemaValue);
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.INTENT, session.getIntent());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.STATE, session.getState());
audit.audit(ConvEngineAuditStage.AUTO_ADVANCE_FACTS, session.getConversationId(), payload);
return new StepResult.Continue();
}
Responsibility: Match and apply transitions/actions
Session Mutations: intent/state/input params
Config/Table Dependencies: ce_rule
Detailed Execution Logic
The core state-machine driver. It queries ce_rule for the current Intent and State.
It evaluates expressions (like JSON_PATH or REGEX) against the session.contextJson.
If a rule matches, it executes the target action (e.g. SET_STATE to CONFIRMATION, or SET_TASK). It loops until no more rules match, effectively "auto-advancing" state machine nodes.
public StepResult execute(EngineSession session) {
applyRules(session, "RulesStep", RulePhase.PRE_RESPONSE_RESOLUTION.name());
session.syncToConversation();
return new StepResult.Continue();
}
Responsibility: Validate state transition path
Session Mutations: state_graph_valid/reason
Config/Table Dependencies: ce_state_graph, ce_audit
Detailed Execution Logic
A strict validater. Checks ce_state_graph to see if the transition that just occurred in RulesStep was legally defined by the developer. If a rule jumped from IDLE to CANCELLED but there is no edge in the graph, this step logs an error and optionally reverts the state to prevent invalid transitions.
public StepResult execute(EngineSession session) {
if (!flowConfig.getStateGraph().isEnabled()) {
return new StepResult.Continue();
}
String fromState = session.getConversation() == null ? null : session.getConversation().getStateCode();
String toState = session.getState();
if (fromState == null || fromState.isBlank() || toState == null || toState.isBlank()
|| fromState.equalsIgnoreCase(toState)) {
return new StepResult.Continue();
}
boolean allowed = isAllowedTransition(fromState, toState);
Map<String, Object> payload = new LinkedHashMap<>();
payload.put("fromState", fromState);
payload.put("toState", toState);
payload.put("intent", session.getIntent());
payload.put("validateOnly", true);
if (allowed) {
session.putInputParam(ConvEngineInputParamKey.STATE_GRAPH_VALID, true);
payload.put("allowed", true);
audit.audit(ConvEngineAuditStage.STATE_GRAPH_VALID, session.getConversationId(), payload);
return new StepResult.Continue();
}
payload.put("allowed", false);
payload.put("softBlock", flowConfig.getStateGraph().isSoftBlockOnViolation());
audit.audit(ConvEngineAuditStage.STATE_GRAPH_VIOLATION, session.getConversationId(), payload);
session.putInputParam(ConvEngineInputParamKey.STATE_GRAPH_VALID, false);
if (flowConfig.getStateGraph().isSoftBlockOnViolation()) {
session.putInputParam(ConvEngineInputParamKey.STATE_GRAPH_SOFT_BLOCK, true);
}
return new StepResult.Continue();
}
Responsibility: Resolve and generate output payload
Session Mutations: payload/last assistant json
Config/Table Dependencies: ce_response, ce_prompt_template
Detailed Execution Logic
The final output generator. Queries ce_response for the current intent and state.
- If
TEXT: Returns a hardcoded string. - If
DERIVED: Loadsce_prompt_template, injects thecontextJson,tool_result, andschema, and asks the LLM to write a fluid, contextual response to the user. interaction_modedoes not change template lookup, but it should describe the intended turn semantics for the selected state (CONFIRM,PROCESSING,FINAL, etc.)interaction_contractis where consumers should declare capabilities such asretryonPROCESSINGprompts oraffirm/editonCONFIRMprompts Setssession.getConversation().setLastAssistantJson()with the payload.
public StepResult execute(EngineSession session) {
if(AgentIntentResolver.INTENT_COLLISION_STATE.equals(session.getState())) {
Map<String, Object> payload = new LinkedHashMap<>();
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.INTENT, session.getIntent());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.STATE, session.getState());
audit.audit(
ConvEngineAuditStage.INTENT_COLLISION_DETECTED,
session.getConversationId(),
payload
);
agentIntentCollisionResolver.resolve(session);
return new StepResult.Continue();
}
Optional<CeResponse> responseOptional = resolveResponse(session);
if(responseOptional.isEmpty()) {
Map<String, Object> payload = new LinkedHashMap<>();
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.INTENT, session.getIntent());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.STATE, session.getState());
audit.audit(
ConvEngineAuditStage.RESPONSE_MAPPING_NOT_FOUND,
session.getConversationId(),
payload
);
throw new ConversationEngineException(
ConversationEngineErrorCode.RESPONSE_MAPPING_NOT_FOUND,
"No response found for intent=" + session.getIntent() + ", state=" + session.getState()
);
}
CeResponse resp = responseOptional.get();
if (!matches(resp.getStateCode(), session.getState()) && !matches(resp.getStateCode(), "ANY")) {
session.setState(resp.getStateCode());
session.getConversation().setStateCode(resp.getStateCode());
}
Map<String, Object> responsePayload = new LinkedHashMap<>();
responsePayload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.RESPONSE_ID, resp.getResponseId());
responsePayload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.INTENT, session.getIntent());
responsePayload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.STATE, session.getState());
audit.audit(
ConvEngineAuditStage.RESOLVE_RESPONSE,
session.getConversationId(),
responsePayload
);
List<ConversationTurn> conversationTurns = historyProvider.lastTurns(session.getConversationId(), 10);
session.setConversationHistory(conversationTurns);
CePromptTemplate template = null;
if(ResponseType.DERIVED.name().equalsIgnoreCase(resp.getResponseType())) {
template = promptRepo.findAll().stream()
.filter(t -> Boolean.TRUE.equals(t.getEnabled()))
.filter(t -> resp.getOutputFormat().equalsIgnoreCase(t.getResponseType()))
.filter(t -> matchesOrNull(t.getIntentCode(), session.getIntent()))
.filter(t -> matchesOrNull(t.getStateCode(), session.getState()) || matches(t.getStateCode(), "ANY"))
.max(Comparator.comparingInt(t -> score(t, session)))
.orElseThrow(() ->
new IllegalStateException(
"No ce_prompt_template found for response_type=" +
resp.getOutputFormat() + ", intent=" + session.getIntent() + ", state=" + session.getState()
)
);
}
typeFactory
.get(resp.getResponseType())
.resolve(session, PromptTemplate.initFrom(template), ResponseTemplate.initFrom(resp));
OutputPayload transformedOutput = responseTransformerService.transformIfApplicable(session.getPayload(), session, session.getInputParams());
session.setPayload(transformedOutput);
Object payloadValue = switch (session.getPayload()) {
case TextPayload(String text) -> text;
case JsonPayload(String json) -> json;
case null -> null;
};
Map<String, Object> outputPayload = new LinkedHashMap<>();
outputPayload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.OUTPUT, payloadValue);
outputPayload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.OUTPUT_FORMAT, resp.getOutputFormat());
outputPayload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.RESPONSE_TYPE, resp.getResponseType());
outputPayload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.RESPONSE_ID, resp.getResponseId());
outputPayload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.INTENT, session.getIntent());
outputPayload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.STATE, session.getState());
outputPayload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.CONTEXT, session.contextDict());
outputPayload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.SCHEMA_JSON, session.schemaJson());
audit.audit(ConvEngineAuditStage.ASSISTANT_OUTPUT, session.getConversationId(), outputPayload);
return new StepResult.Continue();
}
Responsibility: Write memory/session summary
Session Mutations: memory.session_summary in context
Config/Table Dependencies: ce_memory, ce_audit
Detailed Execution Logic
Evaluates the rolling history. If ce_memory is configured, and recentTurns exceeds the threshold, this step fires off a summarization prompt to the LLM. It compresses the last N turns into a dense paragraph and saves it as memory.session_summary in the context JSON, enabling infinite-context retention without blowing up token limits.
public StepResult execute(EngineSession session) {
if (!flowConfig.getMemory().isEnabled()) {
return new StepResult.Continue();
}
String recalled = null;
for (ConversationMemoryStore store : memoryStores) {
try {
String value = store.read(session);
if (value != null && !value.isBlank()) {
recalled = value;
break;
}
} catch (Exception ignored) {
}
}
if (recalled != null) {
session.putInputParam(ConvEngineInputParamKey.MEMORY_RECALL, recalled);
}
String summary = buildSummary(session);
if (summary.length() > flowConfig.getMemory().getSummaryMaxChars()) {
summary = summary.substring(0, flowConfig.getMemory().getSummaryMaxChars());
}
session.putInputParam(ConvEngineInputParamKey.MEMORY_SESSION_SUMMARY, summary);
ObjectNode root = contextHelper.readRoot(session);
ObjectNode memoryNode = contextHelper.ensureObject(root, "memory");
memoryNode.put("session_summary", summary);
if (recalled != null) {
memoryNode.put("recalled_summary", recalled);
}
contextHelper.writeRoot(session, root);
for (ConversationMemoryStore store : memoryStores) {
try {
store.write(session, summary);
} catch (Exception ignored) {
}
}
Map<String, Object> payload = new LinkedHashMap<>();
payload.put("summaryChars", summary.length());
payload.put("recalled", recalled != null);
payload.put("stores", memoryStores.size());
payload.put("intent", session.getIntent());
payload.put("state", session.getState());
audit.audit(ConvEngineAuditStage.MEMORY_UPDATED, session.getConversationId(), payload);
return new StepResult.Continue();
}
Responsibility: Persist final conversation and result
Session Mutations: finalResult
Config/Table Dependencies: ce_conversation
Detailed Execution Logic
The database commit step. Writes the CeConversation row, saving the mutated contextJson, inputParams, new intentCode, and stateCode. The step is placed at the end so if an exception occurs mid-pipeline, the corrupted context is ignored and rolled back natively.
public StepResult execute(EngineSession session) {
// --- sanity check ---
if (session.getPayload() == null) {
throw new ConversationEngineException(
ConversationEngineErrorCode.PIPELINE_NO_RESPONSE_PAYLOAD,
"Engine pipeline ended without payload. ResponseResolutionStep did not run."
);
}
// --- persist conversation ---
sanitizeConversationForPostgres(session);
session.getConversation().setStatus("RUNNING");
session.getConversation().setUpdatedAt(OffsetDateTime.now());
session.getConversation().setInputParamsJson(session.ejectInputParamsJson());
conversationRepo.save(session.getConversation());
// --- build FINAL EngineResult ---
EngineResult result = new EngineResult(
session.getIntent(),
session.getState(),
session.getPayload(),
session.getContextJson()
);
session.setFinalResult(result);
Map<String, Object> payload = new LinkedHashMap<>();
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.INTENT, session.getIntent());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.STATE, session.getState());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.FINAL_RESULT, result);
audit.audit(ConvEngineAuditStage.ENGINE_RETURN, session.getConversationId(), payload);
return new StepResult.Continue();
}
Responsibility: Timing audit + terminal guard
Session Mutations: timings
Config/Table Dependencies: ce_audit
Detailed Execution Logic
Timing and safety metrics. Audits the total millisecond execution time from Step 1 to 25. Fires the PIPELINE_COMPLETE audit log. Verifies that the resulting payload isn't null.
public StepResult execute(EngineSession session) {
// Sort by start time just in case
session.getStepTimings().sort(Comparator.comparingLong(StepTiming::getStartedAtNs));
long totalMs = session.getStepTimings().stream().mapToLong(StepTiming::getDurationMs).sum();
// Log in app logs
String timingLine = session.getStepTimings().stream()
.map(t -> t.getStepName() + "=" + t.getDurationMs() + "ms" + (t.isSuccess() ? "" : "(ERR)"))
.reduce((a, b) -> a + ", " + b)
.orElse("");
log.info("ConvEngine timings convId={} total={}ms [{}]",
session.getConversationId(), totalMs, timingLine);
// Optional audit row (single compact record)
String payload = "{"totalMs":" + totalMs +
","steps":"" + JsonUtil.escape(timingLine) + ""}";
audit.audit(ConvEngineAuditStage.PIPELINE_TIMING, session.getConversationId(), payload);
return new StepResult.Continue();
}