Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions flexus_client_kit/ckit_cloudtool.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@
CLOUDTOOLS_VECDB = {"flexus_vector_search", "flexus_read_original"}
CLOUDTOOLS_PYTHON = {"python_execute"}
CLOUDTOOLS_WEB = {"web"}
CLOUDTOOLS_NOT_KANBAN = CLOUDTOOLS_VECDB | CLOUDTOOLS_PYTHON | CLOUDTOOLS_WEB
CLOUDTOOLS_AGENTS = {"flexus_hand_over_task"}
CLOUDTOOLS_NOT_KANBAN = CLOUDTOOLS_VECDB | CLOUDTOOLS_PYTHON | CLOUDTOOLS_WEB | CLOUDTOOLS_AGENTS

CLOUDTOOLS_MCP = {"mcp_*"}

CLOUDTOOLS_QUITE_A_LOT = KANBAN_ADVANCED | CLOUDTOOLS_NOT_KANBAN | CLOUDTOOLS_MCP

CLOUDTOOLS_ALL_KNOWN = KANBAN_ALL | CLOUDTOOLS_NOT_KANBAN # unsable in a bot
CLOUDTOOLS_ALL_KNOWN = KANBAN_ALL | CLOUDTOOLS_NOT_KANBAN # unusable in a bot


def gql_error_4xx_to_model_reraise_5xx(e: gql.transport.exceptions.TransportQueryError, label: str) -> str:
Expand Down
Empty file.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
155 changes: 155 additions & 0 deletions flexus_simple_bots/integration_tester/integration_tester_bot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import asyncio
import json
import sys
from pathlib import Path

_repo_root = Path(__file__).parents[2]
if str(_repo_root) not in sys.path:
sys.path.insert(0, str(_repo_root))

from flexus_client_kit import ckit_bot_exec, ckit_client, ckit_shutdown, ckit_integrations_db
from flexus_client_kit import ckit_bot_version
from flexus_simple_bots.integration_tester import integration_tester_shared as shared
from flexus_simple_bots.integration_tester import integration_tester_install

BOT_NAME = ckit_bot_version.bot_name_from_file(__file__)
BOT_VERSION = (Path(__file__).parents[1] / "VERSION").read_text().strip()
SETUP_SCHEMA = json.loads((Path(__file__).parent / "setup_schema.json").read_text())


async def integration_tester_main_loop(
fclient: ckit_client.FlexusClient,
rcx: ckit_bot_exec.RobotContext,
) -> None:
setup = ckit_bot_exec.official_setup_mixing_procedure(SETUP_SCHEMA, rcx.persona.persona_setup)
shared.load_env_config(setup)

integr_records = shared.INTEGRATION_TESTER_INTEGRATIONS
setup_allow = shared._setup_allowlist_names(setup)
if setup_allow:
allow = set(setup_allow)
integr_records = [r for r in integr_records if r.integr_name in allow]

integr_objects = await ckit_integrations_db.main_loop_integrations_init(integr_records, rcx, setup)
supported_integrations = sorted({r.integr_name for r in integr_records})

for rec in integr_records:
for tool in rec.integr_tools:
original_handler = rcx._handler_per_tool.get(tool.name)
if original_handler:
rcx.on_tool_call(tool.name)(
shared.make_testing_wrapper(
original_handler,
rec.integr_name,
tool.name,
)
)

@rcx.on_tool_call(shared.PLAN_BATCHES_TOOL.name)
async def toolcall_plan_batches(toolcall, model_produced_args):
args = model_produced_args or {}
req = shared._requested_names(str(args.get("requested", "all")))
bs = args.get("batch_size", 5)
configured_only = bool(args.get("configured_only", True))
try:
bs = int(bs)
except (TypeError, ValueError):
bs = 5

configured = {x["name"] for x in shared.get_configured_integrations(supported_integrations)}
selected = []
unsupported = []

if "all" in req:
pool = [x for x in supported_integrations if (x in configured or not configured_only)]
selected = pool
else:
for x in req:
if x not in supported_integrations:
unsupported.append(x)
continue
if configured_only and x not in configured:
continue
if x not in selected:
selected.append(x)

batches = shared._chunk_names(selected, bs)
tool_name_by_integr = {r.integr_name: r.integr_tools[0].name for r in integr_records if r.integr_tools}
task_specs = []
total = len(batches)
for i, b in enumerate(batches, start=1):
tool_map = ", ".join(f"{name}->{tool_name_by_integr[name]}" for name in b)
task_specs.append({
"title": f"Test integrations batch {i}/{total}",
"description": f"Integrations: {','.join(b)}\nTool mapping: {tool_map}",
"integrations": b,
})

return json.dumps({
"ok": True,
"requested": req,
"supported": supported_integrations,
"configured": sorted(configured),
"configured_only": configured_only,
"selected": selected,
"unsupported": unsupported,
"batch_size": bs,
"batches": batches,
"task_specs": task_specs,
}, indent=2)

configured = shared.get_configured_integrations(supported_integrations)
shared.logger.info(f"Integration Tester started. Configured integrations: {[i['name'] for i in configured]}")

@rcx.on_updated_task
async def on_task_update(action, old_task, new_task):
task = new_task or old_task
if not task:
shared.logger.info(f"TASK UPDATE: {action} with no task payload")
return
col = task.calc_bucket()
title = task.ktask_title
tid = task.ktask_id
if col == "inprogress":
shared.logger.info(f"TASK ASSIGNED: {title} (id={tid}) - will test now")
elif col == "done":
shared.logger.info(f"TASK COMPLETED: {title} (id={tid})")
else:
shared.logger.info(f"TASK UPDATE: {title} moved to {col} (id={tid})")

while not ckit_shutdown.shutdown_event.is_set():
await rcx.unpark_collected_events(sleep_if_no_work=10.0)

shared.logger.info(f"{rcx.persona.persona_id} exit")


def main():
scenario_fn = ckit_bot_exec.parse_bot_args()
fclient = ckit_client.FlexusClient(
ckit_client.bot_service_name(BOT_NAME, BOT_VERSION),
endpoint="/v1/jailed-bot",
)

from dotenv import load_dotenv
load_dotenv()

async def _install_compat(client: ckit_client.FlexusClient) -> int:
await integration_tester_install.install(
client,
bot_name=BOT_NAME,
bot_version=BOT_VERSION,
tools=shared.TOOLS,
)
return 0

asyncio.run(ckit_bot_exec.run_bots_in_this_group(
fclient,
bot_main_loop=integration_tester_main_loop,
inprocess_tools=shared.TOOLS,
scenario_fn=scenario_fn,
install_func=_install_compat,
))


if __name__ == "__main__":
main()
171 changes: 171 additions & 0 deletions flexus_simple_bots/integration_tester/integration_tester_install.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
import asyncio
import json
import logging
import os
from typing import List

from flexus_client_kit import ckit_client, ckit_bot_install, ckit_cloudtool, ckit_skills
from flexus_simple_bots import prompts_common
from flexus_simple_bots.integration_tester import integration_tester_shared as shared

logger = logging.getLogger("integration_tester")

INTEGRATION_TESTER_SKILLS = ckit_skills.static_skills_find(shared.INTEGRATION_TESTER_ROOTDIR, shared_skills_allowlist="", integration_skills_allowlist="")


def _build_experts(tools):
builtin_skills = ckit_skills.read_name_description(shared.INTEGRATION_TESTER_ROOTDIR, INTEGRATION_TESTER_SKILLS)
tool_names = {reg["tool"].name for reg in shared.INTEGRATION_REGISTRY.values()}
tool_names.add(shared.PLAN_BATCHES_TOOL.name)
allow_tools = ",".join(tool_names | ckit_cloudtool.KANBAN_ADVANCED | {"flexus_hand_over_task"})

default_prompt = """You are Integration Tester. Your job is to queue autonomous smoke tests for supported API-key integrations and then report the finished results clearly.

Rules:
- Supported requests are: "all" or a comma-separated list of supported integration names.
- First call integration_plan_batches(requested="...", batch_size=5, configured_only=true).
- Use every returned task_spec to create a task with flexus_hand_over_task(to_bot="Integration Tester", title=..., description=..., fexp_name="autonomous").
- Do not run integration tools in this interactive chat. This chat only plans work and reports completed task results.
- If nothing supported/configured was selected, explain that briefly and stop.
- Mention unsupported requested names if any.

After queueing tasks, reply in this format:
Queued {{N}} batch covering {{X}} integrations: {{name1}} and {{name2}}.

Detailed per-integration results will appear here after the autonomous worker finishes.

When a completed-task message arrives:
- read resolution_summary
- present it as a markdown table if it is a table, otherwise give a short plain summary
- do not dump raw task metadata
"""

autonomous_prompt = """You are Integration Tester smoke test orchestrator. You own one kanban task.

Parse integrations from task description "Integrations: name1,name2,..." and optional "Tool mapping: ..." line.

For each integration:
1. Call op=help to discover available operations
2. Call op=list_methods to see the method catalog
3. Pick 3 different read-only operations that return real provider data (not help, not local status like has_api_key, ready, configured, method_count)
4. Execute all 3 calls and collect results

Classification:
- PASSED: at least 1 of the 3 calls succeeded with real provider data
- FAILED: all 3 calls failed or errored
- Build a markdown table: Integration | Status | Details

Resolve with flexus_kanban_advanced:
- resolution_code=PASSED only if ALL integrations PASSED
- resolution_summary=<the markdown table>

Do not hand over, delegate, or wait for user input.
"""

return [
("default", ckit_bot_install.FMarketplaceExpertInput(
fexp_system_prompt=default_prompt,
fexp_python_kernel="",
fexp_allow_tools=allow_tools,
fexp_nature="NATURE_INTERACTIVE",
fexp_builtin_skills=builtin_skills,
fexp_description="Test API key integrations",
)),
("autonomous", ckit_bot_install.FMarketplaceExpertInput(
fexp_system_prompt=autonomous_prompt,
fexp_python_kernel="",
fexp_allow_tools=allow_tools,
fexp_nature="NATURE_AUTONOMOUS",
fexp_inactivity_timeout=600,
fexp_builtin_skills=builtin_skills,
fexp_description="Autonomous integration testing",
)),
]


INTEGRATION_TESTER_DESC = """
**Job description**

Integration Tester validates that Flexus API key-based integrations are properly configured and functional.
It only tests integrations that are explicitly allowed for this bot and have API keys provided through ENV_CONFIG.

**How it works:**
1. User starts a test session via "Test Integrations" button
2. Bot checks which supported integrations are configured
3. User selects what to test (all or specific supported integrations)
4. Bot creates deterministic kanban batch tasks in inbox
5. Autonomous worker discovers safe operations, runs at least one real read-only API call per integration, and resolves the task with a table of results

**What it tests:**
- Any integration included in this bot's supported allowlist
- Real read-only operations only
- No create/update/delete/send actions

**Results:**
- PASSED: A real non-help read-only call succeeded
- FAILED: A real non-help call failed
- UNTESTED: Only discovery calls were made, so the integration was not actually tested
"""


def _ensure_marketplace_images() -> None:
pic_big_path = shared.INTEGRATION_TESTER_ROOTDIR / "integration_tester-1024x1536.webp"
pic_small_path = shared.INTEGRATION_TESTER_ROOTDIR / "integration_tester-256x256.webp"
fallback_big_path = shared.INTEGRATION_TESTER_ROOTDIR.parent / "bob" / "bob-1024x1536.webp"
fallback_small_path = shared.INTEGRATION_TESTER_ROOTDIR.parent / "bob" / "bob-256x256.webp"

if not pic_big_path.exists() and fallback_big_path.exists():
pic_big_path.write_bytes(fallback_big_path.read_bytes())
if not pic_small_path.exists() and fallback_small_path.exists():
pic_small_path.write_bytes(fallback_small_path.read_bytes())


async def install(
client: ckit_client.FlexusClient,
bot_name: str,
bot_version: str,
tools: List[ckit_cloudtool.CloudTool],
):
setup_schema_path = shared.INTEGRATION_TESTER_ROOTDIR / "setup_schema.json"
integration_tester_setup_default = json.loads(setup_schema_path.read_text())

_ensure_marketplace_images()

experts = _build_experts(tools)

await ckit_bot_install.marketplace_upsert_dev_bot(
client,
ws_id=client.ws_id,
bot_dir=shared.INTEGRATION_TESTER_ROOTDIR,
marketable_title1="Integration Tester",
marketable_title2="Test API key integrations",
marketable_author="Flexus",
marketable_accent_color="#4CAF50",
marketable_occupation="QA Engineer",
marketable_description=INTEGRATION_TESTER_DESC,
marketable_typical_group="Development",
marketable_schedule=[
prompts_common.SCHED_TASK_SORT_10M | {"sched_when": "EVERY:1m", "sched_fexp_name": "default"},
prompts_common.SCHED_TODO_5M | {"sched_when": "EVERY:1m", "sched_fexp_name": "autonomous"},
],
marketable_setup_default=integration_tester_setup_default,
marketable_featured_actions=[
{"feat_question": "Test all integrations", "feat_expert": "default"},
{"feat_question": "Test newsapi", "feat_expert": "default"},
{"feat_question": "Test resend", "feat_expert": "default"},
],
marketable_intro_message="Hi! I'm Integration Tester. I create deterministic kanban batch tasks and resolve them autonomously.",
marketable_preferred_model_expensive="gpt-5.4-mini",
marketable_preferred_model_cheap="gpt-5.4-mini",
marketable_experts=[(name, exp.filter_tools(tools)) for name, exp in experts],
marketable_tags=["testing", "integrations", "qa"],
marketable_forms=ckit_bot_install.load_form_bundles(__file__),
)


if __name__ == "__main__":
from dotenv import load_dotenv
load_dotenv()

client = ckit_client.FlexusClient("integration_tester_install")
asyncio.run(install(client, bot_name="integration_tester", bot_version="dev", tools=shared.TOOLS))
Loading