feat: add note mode and workflow config

This commit is contained in:
shiyue
2026-06-12 16:39:44 +08:00
parent 5308a10b52
commit 8b2173be8f
93 changed files with 15292 additions and 50 deletions

View File

@@ -0,0 +1,641 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Multi-Platform Sub-Agent Context Injection Hook
Injects task-specific context when sub-agents (implement, check, research) are spawned.
Core Design Philosophy:
- Hook is responsible for injecting all context, subagent works autonomously with complete info
- Each agent has a dedicated jsonl file defining its context
- No resume needed, no segmentation, behavior controlled by code not prompt
Trigger: PreToolUse (before Task tool call)
Context Source: .trellis/.current-task points to task directory
- implement.jsonl - Implement agent dedicated context
- check.jsonl - Check agent dedicated context
- prd.md - Requirements document
- info.md - Technical design
- codex-review-output.txt - Code Review results
"""
from __future__ import annotations
# IMPORTANT: Suppress all warnings FIRST
import warnings
warnings.filterwarnings("ignore")
import json
import os
import sys
from pathlib import Path
# IMPORTANT: Force stdout to use UTF-8 on Windows
# This fixes UnicodeEncodeError when outputting non-ASCII characters
if sys.platform.startswith("win"):
import io as _io
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8", errors="replace") # type: ignore[union-attr]
elif hasattr(sys.stdout, "detach"):
sys.stdout = _io.TextIOWrapper(sys.stdout.detach(), encoding="utf-8", errors="replace") # type: ignore[union-attr]
# =============================================================================
# Path Constants (change here to rename directories)
# =============================================================================
DIR_WORKFLOW = ".trellis"
DIR_SPEC = "spec"
FILE_CURRENT_TASK = ".current-task"
FILE_TASK_JSON = "task.json"
# =============================================================================
# Subagent Constants (change here to rename subagent types)
# =============================================================================
AGENT_IMPLEMENT = "trellis-implement"
AGENT_CHECK = "trellis-check"
AGENT_RESEARCH = "trellis-research"
# Agents that require a task directory
AGENTS_REQUIRE_TASK = (AGENT_IMPLEMENT, AGENT_CHECK)
# All supported agents
AGENTS_ALL = (AGENT_IMPLEMENT, AGENT_CHECK, AGENT_RESEARCH)
def find_repo_root(start_path: str) -> str | None:
"""
Find git repo root from start_path upwards
Returns:
Repo root path, or None if not found
"""
current = Path(start_path).resolve()
while current != current.parent:
if (current / ".git").exists():
return str(current)
current = current.parent
return None
def get_current_task(repo_root: str) -> str | None:
"""
Read current task directory path from .trellis/.current-task
Returns:
Task directory relative path (relative to repo_root)
None if not set
"""
current_task_file = os.path.join(repo_root, DIR_WORKFLOW, FILE_CURRENT_TASK)
if not os.path.exists(current_task_file):
return None
try:
with open(current_task_file, "r", encoding="utf-8") as f:
content = f.read().strip()
if not content:
return None
normalized = content.replace("\\", "/")
while normalized.startswith("./"):
normalized = normalized[2:]
if normalized.startswith("tasks/"):
normalized = f".trellis/{normalized}"
return normalized
except Exception:
return None
def read_file_content(base_path: str, file_path: str) -> str | None:
"""Read file content, return None if file doesn't exist"""
full_path = os.path.join(base_path, file_path)
if os.path.exists(full_path) and os.path.isfile(full_path):
try:
with open(full_path, "r", encoding="utf-8") as f:
return f.read()
except Exception:
return None
return None
def read_directory_contents(
base_path: str, dir_path: str, max_files: int = 20
) -> list[tuple[str, str]]:
"""
Read all .md files in a directory
Args:
base_path: Base path (usually repo_root)
dir_path: Directory relative path
max_files: Max files to read (prevent huge directories)
Returns:
[(file_path, content), ...]
"""
full_path = os.path.join(base_path, dir_path)
if not os.path.exists(full_path) or not os.path.isdir(full_path):
return []
results = []
try:
# Only read .md files, sorted by filename
md_files = sorted(
[
f
for f in os.listdir(full_path)
if f.endswith(".md") and os.path.isfile(os.path.join(full_path, f))
]
)
for filename in md_files[:max_files]:
file_full_path = os.path.join(full_path, filename)
relative_path = os.path.join(dir_path, filename)
try:
with open(file_full_path, "r", encoding="utf-8") as f:
content = f.read()
results.append((relative_path, content))
except Exception:
continue
except Exception:
pass
return results
def read_jsonl_entries(base_path: str, jsonl_path: str) -> list[tuple[str, str]]:
"""
Read all file/directory contents referenced in jsonl file
Schema:
{"file": "path/to/file.md", "reason": "..."}
{"file": "path/to/dir/", "type": "directory", "reason": "..."}
{"_example": "..."} # seed row — skipped (no `file` field)
Rows without a ``file`` field (e.g. the self-describing seed line written
by ``task.py create`` before the agent has curated entries) are skipped
silently. If the resulting entry list is empty, a stderr warning is
emitted so the operator can debug missing context.
Returns:
[(path, content), ...]
"""
full_path = os.path.join(base_path, jsonl_path)
if not os.path.exists(full_path):
print(
f"[inject-subagent-context] WARN: {jsonl_path} not found — "
f"sub-agent will receive only prd.md",
file=sys.stderr,
)
return []
results = []
saw_real_entry = False
try:
with open(full_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
item = json.loads(line)
file_path = item.get("file") or item.get("path")
entry_type = item.get("type", "file")
if not file_path:
# Seed / comment row — skip silently
continue
saw_real_entry = True
if entry_type == "directory":
# Read all .md files in directory
dir_contents = read_directory_contents(base_path, file_path)
results.extend(dir_contents)
else:
# Read single file
content = read_file_content(base_path, file_path)
if content:
results.append((file_path, content))
except json.JSONDecodeError:
continue
except Exception:
pass
if not saw_real_entry:
print(
f"[inject-subagent-context] WARN: {jsonl_path} has no curated "
f"entries (only seed / empty) — sub-agent will receive only "
f"prd.md. See workflow.md Phase 1.3 for curation guidance.",
file=sys.stderr,
)
return results
def get_agent_context(repo_root: str, task_dir: str, agent_type: str) -> str:
"""
Get context from {agent_type}.jsonl for the specified agent.
Only reads implement.jsonl or check.jsonl (the two JSONL files the task system creates).
"""
context_parts = []
agent_jsonl = f"{task_dir}/{agent_type}.jsonl"
for file_path, content in read_jsonl_entries(repo_root, agent_jsonl):
context_parts.append(f"=== {file_path} ===\n{content}")
return "\n\n".join(context_parts)
def get_implement_context(repo_root: str, task_dir: str) -> str:
"""
Complete context for Implement Agent
Read order:
1. All files in implement.jsonl (dev specs)
2. prd.md (requirements)
3. info.md (technical design)
"""
context_parts = []
# 1. Read implement.jsonl
base_context = get_agent_context(repo_root, task_dir, "implement")
if base_context:
context_parts.append(base_context)
# 2. Requirements document
prd_content = read_file_content(repo_root, f"{task_dir}/prd.md")
if prd_content:
context_parts.append(f"=== {task_dir}/prd.md (Requirements) ===\n{prd_content}")
# 3. Technical design
info_content = read_file_content(repo_root, f"{task_dir}/info.md")
if info_content:
context_parts.append(
f"=== {task_dir}/info.md (Technical Design) ===\n{info_content}"
)
return "\n\n".join(context_parts)
def get_check_context(repo_root: str, task_dir: str) -> str:
"""
Context for Check Agent: check.jsonl + prd.md
"""
context_parts = []
for file_path, content in read_jsonl_entries(repo_root, f"{task_dir}/check.jsonl"):
context_parts.append(f"=== {file_path} ===\n{content}")
prd_content = read_file_content(repo_root, f"{task_dir}/prd.md")
if prd_content:
context_parts.append(f"=== {task_dir}/prd.md (Requirements) ===\n{prd_content}")
return "\n\n".join(context_parts)
def get_finish_context(repo_root: str, task_dir: str) -> str:
"""
Context for Finish phase: reuses check.jsonl + prd.md
(Finish is a final check, same context source.)
"""
return get_check_context(repo_root, task_dir)
def build_implement_prompt(original_prompt: str, context: str) -> str:
"""Build complete prompt for Implement"""
return f"""# Implement Agent Task
You are the Implement Agent in the Multi-Agent Pipeline.
## Your Context
All the information you need has been prepared for you:
{context}
---
## Your Task
{original_prompt}
---
## Workflow
1. **Understand specs** - All dev specs are injected above, understand them
2. **Understand requirements** - Read requirements document and technical design
3. **Implement feature** - Implement following specs and design
4. **Self-check** - Ensure code quality against check specs
## Important Constraints
- Do NOT execute git commit, only code modifications
- Follow all dev specs injected above
- Report list of modified/created files when done"""
def build_check_prompt(original_prompt: str, context: str) -> str:
"""Build complete prompt for Check"""
return f"""# Check Agent Task
You are the Check Agent in the Multi-Agent Pipeline (code and cross-layer checker).
## Your Context
All check specs and dev specs you need:
{context}
---
## Your Task
{original_prompt}
---
## Workflow
1. **Get changes** - Run `git diff --name-only` and `git diff` to get code changes
2. **Check against specs** - Check item by item against specs above
3. **Self-fix** - Fix issues directly, don't just report
4. **Run verification** - Run project's lint and typecheck commands
## Important Constraints
- Fix issues yourself, don't just report
- Must execute complete checklist in check specs
- Pay special attention to impact radius analysis (L1-L5)"""
def build_finish_prompt(original_prompt: str, context: str) -> str:
"""Build complete prompt for Finish (final check before PR)"""
return f"""# Finish Agent Task
You are performing the final check before creating a PR.
## Your Context
Finish checklist and requirements:
{context}
---
## Your Task
{original_prompt}
---
## Workflow
1. **Review changes** - Run `git diff --name-only` to see all changed files
2. **Verify requirements** - Check each requirement in prd.md is implemented
3. **Spec sync** - Analyze whether changes introduce new patterns, contracts, or conventions
- If new pattern/convention found: read target spec file → update it → update index.md if needed
- If infra/cross-layer change: follow the 7-section mandatory template from update-spec.md
- If pure code fix with no new patterns: skip this step
4. **Run final checks** - Execute lint and typecheck
5. **Confirm ready** - Ensure code is ready for PR
## Important Constraints
- You MAY update spec files when gaps are detected (use update-spec.md as guide)
- MUST read the target spec file BEFORE editing (avoid duplicating existing content)
- Do NOT update specs for trivial changes (typos, formatting, obvious fixes)
- If critical CODE issues found, report them clearly (fix specs, not code)
- Verify all acceptance criteria in prd.md are met"""
def get_research_context(repo_root: str, task_dir: str | None) -> str:
"""
Context for Research Agent — project structure overview for spec directories.
`task_dir` kept for signature parity with get_implement_context / get_check_context
so the dispatcher can call them uniformly.
"""
_ = task_dir
context_parts = []
# 1. Project structure overview (dynamically discover spec directories)
spec_path = f"{DIR_WORKFLOW}/{DIR_SPEC}"
spec_root = Path(repo_root) / DIR_WORKFLOW / DIR_SPEC
# Build spec tree dynamically
tree_lines = [f"{spec_path}/"]
if spec_root.is_dir():
pkg_dirs = sorted(d for d in spec_root.iterdir() if d.is_dir())
for i, pkg_dir in enumerate(pkg_dirs):
is_last = i == len(pkg_dirs) - 1
prefix = "└── " if is_last else "├── "
layers = sorted(d.name for d in pkg_dir.iterdir() if d.is_dir())
layer_info = f" ({', '.join(layers)})" if layers else ""
tree_lines.append(f"{prefix}{pkg_dir.name}/{layer_info}")
spec_tree = "\n".join(tree_lines)
project_structure = f"""## Project Spec Directory Structure
```
{spec_tree}
```
To get structured package info, run: `python3 ./{DIR_WORKFLOW}/scripts/get_context.py --mode packages`
## Search Tips
- Spec files: `{spec_path}/**/*.md`
- Code search: Use Glob and Grep tools
- Tech solutions: Use mcp__exa__web_search_exa or mcp__exa__get_code_context_exa"""
context_parts.append(project_structure)
return "\n\n".join(context_parts)
def build_research_prompt(original_prompt: str, context: str) -> str:
"""Build complete prompt for Research"""
return f"""# Research Agent Task
You are the Research Agent in the Multi-Agent Pipeline (search researcher).
## Core Principle
**You do one thing: find and explain information.**
You are a documenter, not a reviewer.
## Project Info
{context}
---
## Your Task
{original_prompt}
---
## Workflow
1. **Understand query** - Determine search type (internal/external) and scope
2. **Plan search** - List search steps for complex queries
3. **Execute search** - Execute multiple independent searches in parallel
4. **Organize results** - Output structured report
## Search Tools
| Tool | Purpose |
|------|---------|
| Glob | Search by filename pattern |
| Grep | Search by content |
| Read | Read file content |
| mcp__exa__web_search_exa | External web search |
| mcp__exa__get_code_context_exa | External code/doc search |
## Strict Boundaries
**Only allowed**: Describe what exists, where it is, how it works
**Forbidden** (unless explicitly asked):
- Suggest improvements
- Criticize implementation
- Recommend refactoring
- Modify any files
## Report Format
Provide structured search results including:
- List of files found (with paths)
- Code pattern analysis (if applicable)
- Related spec documents
- External references (if any)"""
def _parse_hook_input(input_data: dict) -> tuple[str, str, dict]:
"""Parse hook input across different platform formats.
Returns (subagent_type, original_prompt, tool_input).
Handles:
- Claude Code / Qoder / CodeBuddy / Droid: tool_name=Task|Agent, tool_input.subagent_type
- Cursor: tool_name=Task, tool_input.subagent_type
- Copilot CLI: toolName=task (camelCase key, lowercase value)
- Gemini CLI: tool_name IS the agent name (BeforeTool matcher already filtered)
- Kiro: agentSpawn hook, agent_name field at top level
"""
tool_input = input_data.get("tool_input", {})
# Standard format: Task/Agent tool with subagent_type
tool_name = input_data.get("tool_name", "") or input_data.get("toolName", "")
if tool_name.lower() in ("task", "agent"):
return (
tool_input.get("subagent_type", ""),
tool_input.get("prompt", ""),
tool_input,
)
# Kiro: agentSpawn hook passes agent_name at top level
agent_name = input_data.get("agent_name", "")
if agent_name:
return agent_name, tool_input.get("prompt", input_data.get("prompt", "")), tool_input
# Gemini CLI: BeforeTool where tool_name IS the agent name
# (matcher already ensured it's one of our agents)
if tool_name in AGENTS_ALL:
return tool_name, tool_input.get("prompt", ""), tool_input
# Copilot CLI: toolName field (camelCase), value might be the agent name
tool_name_camel = input_data.get("toolName", "")
if tool_name_camel in AGENTS_ALL:
return tool_name_camel, input_data.get("toolArgs", ""), tool_input
return "", "", tool_input
def main():
try:
input_data = json.load(sys.stdin)
except json.JSONDecodeError:
sys.exit(0)
subagent_type, original_prompt, tool_input = _parse_hook_input(input_data)
cwd = input_data.get("cwd", os.getcwd())
# Only handle subagent types we care about
if subagent_type not in AGENTS_ALL:
sys.exit(0)
# Find repo root
repo_root = find_repo_root(cwd)
if not repo_root:
sys.exit(0)
# Get current task directory (research doesn't require it)
task_dir = get_current_task(repo_root)
# implement/check need task directory
if subagent_type in AGENTS_REQUIRE_TASK:
if not task_dir:
sys.exit(0)
# Check if task directory exists
task_dir_full = os.path.join(repo_root, task_dir)
if not os.path.exists(task_dir_full):
sys.exit(0)
# Check for [finish] marker in prompt (check agent with finish context)
is_finish_phase = "[finish]" in original_prompt.lower()
# Get context and build prompt based on subagent type
if subagent_type == AGENT_IMPLEMENT:
assert task_dir is not None # validated above
context = get_implement_context(repo_root, task_dir)
new_prompt = build_implement_prompt(original_prompt, context)
elif subagent_type == AGENT_CHECK:
assert task_dir is not None # validated above
if is_finish_phase:
# Finish phase: use finish context (lighter, focused on final verification)
context = get_finish_context(repo_root, task_dir)
new_prompt = build_finish_prompt(original_prompt, context)
else:
# Regular check phase: use check context (full specs for self-fix loop)
context = get_check_context(repo_root, task_dir)
new_prompt = build_check_prompt(original_prompt, context)
elif subagent_type == AGENT_RESEARCH:
# Research can work without task directory
context = get_research_context(repo_root, task_dir)
new_prompt = build_research_prompt(original_prompt, context)
else:
sys.exit(0)
if not context:
sys.exit(0)
# Return updated input — use a multi-format output that covers all platforms.
# Most platforms ignore unrecognized fields, so we include multiple formats.
# The platform picks whichever fields it understands.
updated = {**tool_input, "prompt": new_prompt}
output = {
# Claude Code / Qoder / CodeBuddy / Droid format
"hookSpecificOutput": {
"hookEventName": "PreToolUse",
"permissionDecision": "allow",
"updatedInput": updated,
},
# Cursor format
"permission": "allow",
"updated_input": updated,
# Gemini format
"updatedInput": updated,
}
print(json.dumps(output, ensure_ascii=False))
sys.exit(0)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,247 @@
#!/usr/bin/env python3
"""Trellis UserPromptSubmit hook: inject per-turn workflow breadcrumb.
Runs on every user prompt. Reads the active task (.trellis/.current-task)
and emits a short <workflow-state> block reminding the main AI what task
is active and its expected flow. Breadcrumb text is pulled from
workflow.md [workflow-state:STATUS] tag blocks (single source of truth
for users who fork the Trellis workflow), with hardcoded fallbacks so
the hook never breaks when workflow.md is missing or malformed.
Shared across all hook-capable platforms (Claude, Cursor, Codex, Qoder,
CodeBuddy, Droid, Gemini, Copilot). Kiro is not wired (no per-turn
hook entry point). Written to each platform's hooks directory via
writeSharedHooks() at init time.
Silent exit 0 cases (no output):
- No .trellis/ directory found (not a Trellis project)
- No .current-task file, or it's empty
- task.json malformed or missing status
Unknown status (no tag + no hardcoded fallback) emits a generic
breadcrumb rather than silent-exiting, so custom statuses surface in
the UI instead of appearing as "randomly broken".
"""
from __future__ import annotations
import json
import os
import re
import sys
from pathlib import Path
from typing import Optional, Tuple
# ---------------------------------------------------------------------------
# CWD-robust Trellis root discovery (fixes hook-path-robustness for this hook)
# ---------------------------------------------------------------------------
def find_trellis_root(start: Path) -> Optional[Path]:
"""Walk up from start to find directory containing .trellis/.
Handles CWD drift: subdirectory launches, monorepo packages, etc.
Returns None if no .trellis/ found (silent no-op).
"""
cur = start.resolve()
while cur != cur.parent:
if (cur / ".trellis").is_dir():
return cur
cur = cur.parent
return None
# ---------------------------------------------------------------------------
# Active task discovery
# ---------------------------------------------------------------------------
def _normalize_task_ref(task_ref: str) -> str:
"""Normalize .current-task path ref.
Accepts:
- Absolute paths (left as-is)
- Windows-style backslashes (converted to forward slash)
- Legacy relative refs like "tasks/foo" (prefixed with .trellis/)
"""
normalized = task_ref.strip()
if not normalized:
return ""
path_obj = Path(normalized)
if path_obj.is_absolute():
return str(path_obj)
normalized = normalized.replace("\\", "/")
while normalized.startswith("./"):
normalized = normalized[2:]
if normalized.startswith("tasks/"):
normalized = f".trellis/{normalized}"
return normalized
def get_active_task(root: Path) -> Optional[Tuple[str, str]]:
"""Return (task_id, status) from the current active task, else None.
Reads .trellis/.current-task (a path relative to root, e.g.
".trellis/tasks/04-17-foo") then that task's task.json.
Normalizes backslashes so Windows paths work on Unix and vice versa.
"""
ref_file = root / ".trellis" / ".current-task"
if not ref_file.is_file():
return None
try:
raw = ref_file.read_text(encoding="utf-8").strip()
except OSError:
return None
task_ref = _normalize_task_ref(raw)
if not task_ref:
return None
path_obj = Path(task_ref)
task_dir = path_obj if path_obj.is_absolute() else root / path_obj
task_json = task_dir / "task.json"
if not task_json.is_file():
return None
try:
data = json.loads(task_json.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
return None
task_id = data.get("id") or task_dir.name
status = data.get("status", "")
if not isinstance(status, str) or not status:
return None
return task_id, status
# ---------------------------------------------------------------------------
# Breadcrumb loading: parse workflow.md, fall back to hardcoded defaults
# ---------------------------------------------------------------------------
# Supports STATUS values with letters, digits, underscores, hyphens
# (so "in-review" / "blocked-by-team" work alongside "in_progress").
_TAG_RE = re.compile(
r"\[workflow-state:([A-Za-z0-9_-]+)\]\s*\n(.*?)\n\s*\[/workflow-state:\1\]",
re.DOTALL,
)
# Hardcoded defaults for built-in Trellis statuses. Used when workflow.md is
# missing, malformed, or lacks the tag for this status.
#
# `no_task` is a pseudo-status emitted when .current-task is missing — it keeps
# the Next-Action reminder flowing per-turn even without an active task.
_FALLBACK_BREADCRUMBS = {
"no_task": (
"No active task.\n"
"Trigger words in the user message that REQUIRE creating a task "
"(non-negotiable, do NOT self-exempt): 重构 / 抽成 / 独立 / 分发 / "
"拆出来 / 搞一个 / 做成 / 接入 / 集成 / refactor / rewrite / extract / "
"productize / publish / build X / design Y.\n"
"Task is NOT required ONLY if ALL three hold: (a) zero file writes "
"this turn, (b) answer fits in one reply with no multi-round plan, "
"(c) no research beyond reading 1-2 repo files.\n"
"When in doubt: create task. Over-tasking is cheap; under-tasking "
"leaks plans and research into main context.\n"
"Flow: load `trellis-brainstorm` skill → it creates the task via "
"`python3 ./.trellis/scripts/task.py create` and drives requirements Q&A. "
"For research-heavy work (tool comparison, docs, cross-platform survey), "
"spawn `trellis-research` sub-agents via Task tool — NEVER do 3+ inline "
"WebFetch/WebSearch/`gh api` calls in the main conversation."
),
"planning": (
"Complete prd.md via trellis-brainstorm skill; then run task.py start.\n"
"Research belongs in `{task_dir}/research/*.md`, written by "
"`trellis-research` sub-agents. Do NOT inline WebFetch/WebSearch in "
"main session — PRD only links to research files."
),
"in_progress": (
"Flow: trellis-implement → trellis-check → trellis-update-spec → finish\n"
"Next required action: inspect conversation history + git status, then "
"execute the next uncompleted step in that sequence.\n"
"For agent-capable platforms, do NOT edit code in the main session; "
"dispatch `trellis-implement` for implementation and dispatch "
"`trellis-check` before reporting completion."
),
"completed": (
"User commits changes; then run task.py archive."
),
}
def load_breadcrumbs(root: Path) -> dict[str, str]:
"""Parse workflow.md for [workflow-state:STATUS] blocks.
Returns {status: body_text}. Missing tags fall back to hardcoded
defaults so the hook always has something to say for built-in
statuses. Custom statuses without tags fall to generic breadcrumb
downstream (see build_breadcrumb).
"""
result = dict(_FALLBACK_BREADCRUMBS)
workflow = root / ".trellis" / "workflow.md"
if not workflow.is_file():
return result
try:
content = workflow.read_text(encoding="utf-8")
except OSError:
return result
for match in _TAG_RE.finditer(content):
status = match.group(1)
body = match.group(2).strip()
if body:
result[status] = body
return result
def build_breadcrumb(
task_id: Optional[str], status: str, templates: dict[str, str]
) -> str:
"""Build the <workflow-state>...</workflow-state> block.
- Known status (in templates or fallback) → detailed template body
- Unknown status (no tag + no fallback) → generic "refer to workflow.md"
- `no_task` pseudo-status (task_id is None) → header omits task info
"""
body = templates.get(status)
if body is None:
body = "Refer to workflow.md for current step."
header = f"Status: {status}" if task_id is None else f"Task: {task_id} ({status})"
return f"<workflow-state>\n{header}\n{body}\n</workflow-state>"
# ---------------------------------------------------------------------------
# Entry
# ---------------------------------------------------------------------------
def main() -> int:
try:
data = json.load(sys.stdin)
except (json.JSONDecodeError, ValueError):
data = {}
cwd_str = data.get("cwd") or os.getcwd()
cwd = Path(cwd_str)
root = find_trellis_root(cwd)
if root is None:
return 0 # not a Trellis project
templates = load_breadcrumbs(root)
task = get_active_task(root)
if task is None:
# No active task — still emit a breadcrumb nudging AI toward
# trellis-brainstorm + task.py create when user describes real work.
breadcrumb = build_breadcrumb(None, "no_task", templates)
else:
breadcrumb = build_breadcrumb(*task, templates=templates)
output = {
"hookSpecificOutput": {
"hookEventName": "UserPromptSubmit",
"additionalContext": breadcrumb,
}
}
print(json.dumps(output))
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,577 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Session Start Hook - Inject structured context
"""
from __future__ import annotations
# IMPORTANT: Suppress all warnings FIRST
import warnings
warnings.filterwarnings("ignore")
import json
import os
import subprocess
import sys
from io import StringIO
from pathlib import Path
FIRST_REPLY_NOTICE = """<first-reply-notice>
On the first visible assistant reply in this session, begin with exactly one short Chinese sentence:
Trellis SessionStart 已注入workflow、当前任务状态、开发者身份、git 状态、active tasks、spec 索引已加载。
Then continue directly with the user's request. This notice is one-shot: do not repeat it after the first assistant reply in the same session.
</first-reply-notice>"""
# IMPORTANT: Force stdout to use UTF-8 on Windows
# This fixes UnicodeEncodeError when outputting non-ASCII characters
if sys.platform.startswith("win"):
import io as _io
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8", errors="replace") # type: ignore[union-attr]
elif hasattr(sys.stdout, "detach"):
sys.stdout = _io.TextIOWrapper(sys.stdout.detach(), encoding="utf-8", errors="replace") # type: ignore[union-attr]
def _has_curated_jsonl_entry(jsonl_path: Path) -> bool:
"""Return True iff jsonl has at least one row with a ``file`` field.
A freshly seeded jsonl only contains a ``{"_example": ...}`` row (no
``file`` key) — that is NOT "ready". Readiness requires at least one
curated entry. Matches the contract used by ``inject-subagent-context.py``.
"""
try:
for line in jsonl_path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line:
continue
try:
row = json.loads(line)
except json.JSONDecodeError:
continue
if isinstance(row, dict) and row.get("file"):
return True
except (OSError, UnicodeDecodeError):
return False
return False
def should_skip_injection() -> bool:
"""Check if any platform's non-interactive flag is set."""
non_interactive_vars = [
"CLAUDE_NON_INTERACTIVE",
"QODER_NON_INTERACTIVE",
"CODEBUDDY_NON_INTERACTIVE",
"FACTORY_NON_INTERACTIVE",
"CURSOR_NON_INTERACTIVE",
"GEMINI_NON_INTERACTIVE",
"KIRO_NON_INTERACTIVE",
"COPILOT_NON_INTERACTIVE",
]
return any(os.environ.get(var) == "1" for var in non_interactive_vars)
def read_file(path: Path, fallback: str = "") -> str:
try:
return path.read_text(encoding="utf-8")
except (FileNotFoundError, PermissionError):
return fallback
def run_script(script_path: Path) -> str:
try:
if script_path.suffix == ".py":
# Add PYTHONIOENCODING to force UTF-8 in subprocess
env = os.environ.copy()
env["PYTHONIOENCODING"] = "utf-8"
cmd = [sys.executable, "-W", "ignore", str(script_path)]
else:
env = os.environ
cmd = [str(script_path)]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
timeout=5,
cwd=script_path.parent.parent.parent,
env=env,
)
return result.stdout if result.returncode == 0 else "No context available"
except (subprocess.TimeoutExpired, FileNotFoundError, PermissionError):
return "No context available"
def _normalize_task_ref(task_ref: str) -> str:
normalized = task_ref.strip()
if not normalized:
return ""
path_obj = Path(normalized)
if path_obj.is_absolute():
return str(path_obj)
normalized = normalized.replace("\\", "/")
while normalized.startswith("./"):
normalized = normalized[2:]
if normalized.startswith("tasks/"):
return f".trellis/{normalized}"
return normalized
def _resolve_task_dir(trellis_dir: Path, task_ref: str) -> Path:
normalized = _normalize_task_ref(task_ref)
path_obj = Path(normalized)
if path_obj.is_absolute():
return path_obj
if normalized.startswith(".trellis/"):
return trellis_dir.parent / path_obj
return trellis_dir / "tasks" / path_obj
def _get_task_status(trellis_dir: Path) -> str:
"""Check current task status and return structured status string with explicit next action.
Returns a block with three fields:
- Status: current state
- Task: task identifier (when applicable)
- Next-Action: explicit skill/command/tool call the AI should invoke
"""
current_task_file = trellis_dir / ".current-task"
# Case 1: No active task — waiting for user to describe intent
if not current_task_file.is_file() or not current_task_file.read_text(encoding="utf-8").strip():
return (
"Status: NO ACTIVE TASK\n"
"Next-Action: After the user describes their intent, load skill `trellis-brainstorm` "
"to clarify requirements and create a task via `python3 ./.trellis/scripts/task.py create`.\n"
"Research reminder: for research-heavy tasks (comparing tools, reading external docs, "
"cross-platform surveys), spawn `trellis-research` sub-agents via the Task tool — "
"they persist findings to `{TASK_DIR}/research/*.md` and keep main context clean. "
"Do NOT do 10+ inline WebFetch/WebSearch in the main conversation."
)
task_ref = _normalize_task_ref(current_task_file.read_text(encoding="utf-8").strip())
# Case 2: Stale pointer — task dir was deleted
task_dir = _resolve_task_dir(trellis_dir, task_ref)
if not task_dir.is_dir():
return (
f"Status: STALE POINTER\nTask: {task_ref}\n"
f"Next-Action: Run `python3 ./.trellis/scripts/task.py finish` to clear the stale pointer, "
"then ask the user what to work on next."
)
# Read task.json
task_json_path = task_dir / "task.json"
task_data = {}
if task_json_path.is_file():
try:
task_data = json.loads(task_json_path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, PermissionError):
pass
task_title = task_data.get("title", task_ref)
task_status = task_data.get("status", "unknown")
# Case 3: Task completed — time to archive
if task_status == "completed":
return (
f"Status: COMPLETED\nTask: {task_title}\n"
f"Next-Action: Load skill `trellis-update-spec` to capture learnings, "
f"then archive with `python3 ./.trellis/scripts/task.py archive {task_dir.name}`."
)
has_prd = (task_dir / "prd.md").is_file()
# Case 4: No PRD — still in Plan phase
if not has_prd:
return (
f"Status: PLANNING\nTask: {task_title}\n"
"Next-Action: Load skill `trellis-brainstorm` to clarify requirements with the user "
"and produce prd.md in the task directory.\n"
"Research reminder: when the task needs external research (tool comparison, docs, "
"conventions survey), spawn `trellis-research` sub-agents — don't WebFetch/WebSearch "
"inline in the main session. Findings go to `{task_dir}/research/*.md`; PRD only links to them."
)
# Case 4b: PRD exists but implement.jsonl has only seed (no curated entries) — Phase 1.3 gate
implement_jsonl = task_dir / "implement.jsonl"
if implement_jsonl.is_file() and not _has_curated_jsonl_entry(implement_jsonl):
return (
f"Status: PLANNING (Phase 1.3)\nTask: {task_title}\n"
"Next-Action: Curate `implement.jsonl` and `check.jsonl` with the spec + research files "
"the Phase 2 sub-agents will need. Only spec paths (`.trellis/spec/**/*.md`) and research "
"files (`{TASK_DIR}/research/*.md`) — no code paths. Run "
"`python3 ./.trellis/scripts/get_context.py --mode packages` to list available specs, "
"then edit the jsonl files or use `python3 ./.trellis/scripts/task.py add-context`. "
"See `.trellis/workflow.md` Phase 1.3 for details."
)
# Case 5: PRD + curated jsonl (or agent-less platform with no jsonl) — enter Execute phase
return (
f"Status: READY\nTask: {task_title}\n"
"Next required action: dispatch `trellis-implement` per Phase 2.1. "
"For agent-capable platforms, do NOT edit code in the main session. "
"After implementation, dispatch `trellis-check` per Phase 2.2 before reporting completion.\n"
"Sub-agent roster: `trellis-implement` (writes code), `trellis-check` (verifies + self-fixes), "
"`trellis-research` (persists findings to `research/*.md` — use when you'd otherwise do "
"multiple WebFetch/WebSearch inline)."
)
def _load_trellis_config(trellis_dir: Path) -> tuple:
"""Load Trellis config for session-start decisions.
Returns:
(is_mono, packages_dict, spec_scope, task_pkg, default_pkg)
"""
scripts_dir = trellis_dir / "scripts"
if str(scripts_dir) not in sys.path:
sys.path.insert(0, str(scripts_dir))
try:
from common.config import get_default_package, get_packages, get_spec_scope, is_monorepo # type: ignore[import-not-found]
from common.paths import get_current_task # type: ignore[import-not-found]
repo_root = trellis_dir.parent
is_mono = is_monorepo(repo_root)
packages = get_packages(repo_root) or {}
scope = get_spec_scope(repo_root)
# Get active task's package
task_pkg = None
current = get_current_task(repo_root)
if current:
task_json = repo_root / current / "task.json"
if task_json.is_file():
try:
data = json.loads(task_json.read_text(encoding="utf-8"))
if isinstance(data, dict):
tp = data.get("package")
if isinstance(tp, str) and tp:
task_pkg = tp
except (json.JSONDecodeError, OSError):
pass
default_pkg = get_default_package(repo_root)
return is_mono, packages, scope, task_pkg, default_pkg
except Exception:
return False, {}, None, None, None
def _check_legacy_spec(trellis_dir: Path, is_mono: bool, packages: dict) -> str | None:
"""Check for legacy spec directory structure in monorepo.
Returns warning message if legacy structure detected, None otherwise.
"""
if not is_mono or not packages:
return None
spec_dir = trellis_dir / "spec"
if not spec_dir.is_dir():
return None
# Check for legacy flat spec dirs (spec/backend/, spec/frontend/ with index.md)
has_legacy = False
for legacy_name in ("backend", "frontend"):
legacy_dir = spec_dir / legacy_name
if legacy_dir.is_dir() and (legacy_dir / "index.md").is_file():
has_legacy = True
break
if not has_legacy:
return None
# Check which packages are missing spec/<pkg>/ directory
missing = [
name for name in sorted(packages.keys())
if not (spec_dir / name).is_dir()
]
if not missing:
return None # All packages have spec dirs
if len(missing) == len(packages):
return (
f"[!] Legacy spec structure detected: found `spec/backend/` or `spec/frontend/` "
f"but no package-scoped `spec/<package>/` directories.\n"
f"Monorepo packages: {', '.join(sorted(packages.keys()))}\n"
f"Please reorganize: `spec/backend/` -> `spec/<package>/backend/`"
)
return (
f"[!] Partial spec migration detected: packages {', '.join(missing)} "
f"still missing `spec/<pkg>/` directory.\n"
f"Please complete migration for all packages."
)
def _resolve_spec_scope(
is_mono: bool,
packages: dict,
scope,
task_pkg: str | None,
default_pkg: str | None,
) -> set | None:
"""Resolve which packages should have their specs injected.
Returns:
Set of package names to include, or None for full scan.
"""
if not is_mono or not packages:
return None # Single-repo: full scan
if scope is None:
return None # No scope configured: full scan
if isinstance(scope, str) and scope == "active_task":
if task_pkg and task_pkg in packages:
return {task_pkg}
if default_pkg and default_pkg in packages:
return {default_pkg}
return None # Fallback to full scan
if isinstance(scope, list):
valid = set()
for entry in scope:
if entry in packages:
valid.add(entry)
else:
print(
f"Warning: spec_scope contains unknown package: {entry}, ignoring",
file=sys.stderr,
)
if valid:
# Warn if active task is out of scope
if task_pkg and task_pkg not in valid:
print(
f"Warning: active task package '{task_pkg}' is out of configured spec_scope",
file=sys.stderr,
)
return valid
# All entries invalid: fallback chain
print(
"Warning: all spec_scope entries invalid, falling back to task/default/full",
file=sys.stderr,
)
if task_pkg and task_pkg in packages:
return {task_pkg}
if default_pkg and default_pkg in packages:
return {default_pkg}
return None # Full scan
return None # Unknown scope type: full scan
def _extract_range(content: str, start_header: str, end_header: str) -> str:
"""Extract lines starting at `## start_header` up to (but excluding) `## end_header`.
Both parameters are full header lines WITHOUT the `## ` prefix (e.g. "Phase Index").
Returns empty string if start header is not found.
End header missing → extracts to end of file.
"""
lines = content.splitlines()
start: int | None = None
end: int = len(lines)
start_match = f"## {start_header}"
end_match = f"## {end_header}"
for i, line in enumerate(lines):
stripped = line.strip()
if start is None and stripped == start_match:
start = i
continue
if start is not None and stripped == end_match:
end = i
break
if start is None:
return ""
return "\n".join(lines[start:end]).rstrip()
def _build_workflow_overview(workflow_path: Path) -> str:
"""Inject the workflow guide for the session.
Contents:
1. Section index (all `## ` headings — navigation)
2. Phase Index section (rules, skill routing table, anti-rationalization table)
3. Phase 1/2/3 step-level details (the actual how-to for each step)
The meta sections (Core Principles / Trellis System / Workflow State
Breadcrumbs) are NOT injected — Core Principles is short prose the AI can
Read on demand; Trellis System lists reference commands duplicated in
step bodies; Breadcrumbs are consumed by the UserPromptSubmit hook.
Total budget: Phase Index ~2 KB + Phase 1/2/3 ~7 KB = ~9 KB.
"""
content = read_file(workflow_path)
if not content:
return "No workflow.md found"
out_lines = [
"# Development Workflow — Section Index",
"Full guide: .trellis/workflow.md (read on demand)",
"",
"## Table of Contents",
]
for line in content.splitlines():
if line.startswith("## "):
out_lines.append(line)
out_lines += ["", "---", ""]
# Extract Phase Index through the end of Phase 3 (before Breadcrumbs).
# Since sections appear in order Phase Index → Phase 1 → Phase 2 → Phase 3
# → Workflow State Breadcrumbs, a single range grab captures all four.
phases = _extract_range(
content, "Phase Index", "Workflow State Breadcrumbs"
)
if phases:
out_lines.append(phases)
return "\n".join(out_lines).rstrip()
def main():
if should_skip_injection():
sys.exit(0)
# Try platform-specific env vars, fallback to cwd
project_dir_env_vars = [
"CLAUDE_PROJECT_DIR",
"QODER_PROJECT_DIR",
"CODEBUDDY_PROJECT_DIR",
"FACTORY_PROJECT_DIR",
"CURSOR_PROJECT_DIR",
"GEMINI_PROJECT_DIR",
"KIRO_PROJECT_DIR",
"COPILOT_PROJECT_DIR",
]
project_dir = None
for var in project_dir_env_vars:
val = os.environ.get(var)
if val:
project_dir = Path(val).resolve()
break
if project_dir is None:
project_dir = Path(".").resolve()
trellis_dir = project_dir / ".trellis"
# Load config for scope filtering and legacy detection
is_mono, packages, scope_config, task_pkg, default_pkg = _load_trellis_config(trellis_dir)
allowed_pkgs = _resolve_spec_scope(is_mono, packages, scope_config, task_pkg, default_pkg)
output = StringIO()
output.write("""<session-context>
You are starting a new session in a Trellis-managed project.
Read and follow all instructions below carefully.
</session-context>
""")
output.write(FIRST_REPLY_NOTICE)
output.write("\n\n")
# Legacy migration warning
legacy_warning = _check_legacy_spec(trellis_dir, is_mono, packages)
if legacy_warning:
output.write(f"<migration-warning>\n{legacy_warning}\n</migration-warning>\n\n")
output.write("<current-state>\n")
context_script = trellis_dir / "scripts" / "get_context.py"
output.write(run_script(context_script))
output.write("\n</current-state>\n\n")
output.write("<workflow>\n")
output.write(_build_workflow_overview(trellis_dir / "workflow.md"))
output.write("\n</workflow>\n\n")
output.write("<guidelines>\n")
output.write(
"Project spec indexes are listed by path below. Each index contains a "
"**Pre-Development Checklist** listing the specific guideline files to "
"read before coding.\n\n"
"- If you're spawning an implement/check sub-agent, context is injected "
"automatically via `{task}/implement.jsonl` / `check.jsonl`. You do NOT "
"need to read these indexes yourself.\n"
"- For agent-capable platforms, do NOT edit code directly in the main "
"session; dispatch `trellis-implement` and `trellis-check` so JSONL "
"context is loaded by the sub-agents.\n\n"
)
# guides/ is cross-package thinking — always include inline (small, broadly useful)
guides_index = trellis_dir / "spec" / "guides" / "index.md"
if guides_index.is_file():
output.write("## guides (inlined — cross-package thinking guides)\n")
output.write(read_file(guides_index))
output.write("\n\n")
# Other spec indexes — paths only (main agent reads on demand;
# sub-agents get their specific specs via jsonl injection)
paths: list[str] = []
spec_dir = trellis_dir / "spec"
if spec_dir.is_dir():
for sub in sorted(spec_dir.iterdir()):
if not sub.is_dir() or sub.name.startswith("."):
continue
if sub.name == "guides":
continue # already inlined above
index_file = sub / "index.md"
if index_file.is_file():
# Flat spec dir (single-repo layer like spec/backend/)
paths.append(f".trellis/spec/{sub.name}/index.md")
else:
# Nested package dirs (monorepo: spec/<pkg>/<layer>/index.md)
# Apply scope filter
if allowed_pkgs is not None and sub.name not in allowed_pkgs:
continue
for nested in sorted(sub.iterdir()):
if not nested.is_dir():
continue
nested_index = nested / "index.md"
if nested_index.is_file():
paths.append(
f".trellis/spec/{sub.name}/{nested.name}/index.md"
)
if paths:
output.write("## Available spec indexes (read on demand)\n")
for p in paths:
output.write(f"- {p}\n")
output.write("\n")
output.write(
"Discover more via: "
"`python3 ./.trellis/scripts/get_context.py --mode packages`\n"
)
output.write("</guidelines>\n\n")
# Check task status and inject structured tag
task_status = _get_task_status(trellis_dir)
output.write(f"<task-status>\n{task_status}\n</task-status>\n\n")
output.write("""<ready>
Context loaded. Workflow index, project state, and guidelines are already injected above — do NOT re-read them.
When the user sends the first message, follow <task-status> and the workflow guide.
If a task is READY, execute its Next required action without asking whether to continue.
</ready>""")
result = {
"hookSpecificOutput": {
"hookEventName": "SessionStart",
"additionalContext": output.getvalue(),
}
}
# Output JSON - stdout is already configured for UTF-8
print(json.dumps(result, ensure_ascii=False), flush=True)
if __name__ == "__main__":
main()

219
.claude/hooks/statusline.py Normal file
View File

@@ -0,0 +1,219 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Trellis StatusLine — project-level status display for Claude Code.
Reads Claude Code session JSON from stdin + Trellis task data from filesystem.
Outputs 1-2 lines:
With active task: [P1] Task title (status) + info line
Without task: info line only
Info line: model · ctx% · branch · duration · developer · tasks · rate limits
"""
from __future__ import annotations
import json
import re
import subprocess
import sys
from pathlib import Path
# Fix: Windows Python defaults to GBK encoding, which corrupts UTF-8
# characters like the middle dot (·). Wrap stdout/stderr with UTF-8.
if sys.platform == "win32":
for stream in (sys.stdout, sys.stderr):
reconfigure = getattr(stream, "reconfigure", None)
if callable(reconfigure):
reconfigure(encoding="utf-8", errors="replace")
def _read_text(path: Path) -> str:
try:
return path.read_text(encoding="utf-8").strip()
except (FileNotFoundError, PermissionError, OSError):
return ""
def _read_json(path: Path) -> dict:
text = _read_text(path)
if not text:
return {}
try:
return json.loads(text)
except (json.JSONDecodeError, ValueError):
return {}
def _normalize_task_ref(task_ref: str) -> str:
normalized = task_ref.strip()
if not normalized:
return ""
path_obj = Path(normalized)
if path_obj.is_absolute():
return str(path_obj)
normalized = normalized.replace("\\", "/")
while normalized.startswith("./"):
normalized = normalized[2:]
if normalized.startswith("tasks/"):
return f".trellis/{normalized}"
return normalized
def _resolve_task_dir(trellis_dir: Path, task_ref: str) -> Path:
normalized = _normalize_task_ref(task_ref)
path_obj = Path(normalized)
if path_obj.is_absolute():
return path_obj
if normalized.startswith(".trellis/"):
return trellis_dir.parent / path_obj
return trellis_dir / "tasks" / path_obj
def _find_trellis_dir() -> Path | None:
"""Walk up from cwd to find .trellis/ directory."""
current = Path.cwd()
for parent in [current, *current.parents]:
candidate = parent / ".trellis"
if candidate.is_dir():
return candidate
return None
def _get_current_task(trellis_dir: Path) -> dict | None:
"""Load current task info. Returns dict with title/status/priority or None."""
task_ref = _normalize_task_ref(_read_text(trellis_dir / ".current-task"))
if not task_ref:
return None
# Resolve task directory
task_path = _resolve_task_dir(trellis_dir, task_ref)
task_data = _read_json(task_path / "task.json")
if not task_data:
return None
return {
"title": task_data.get("title") or task_data.get("name") or "unknown",
"status": task_data.get("status", "unknown"),
"priority": task_data.get("priority", "P2"),
}
def _count_active_tasks(trellis_dir: Path) -> int:
"""Count non-archived task directories with valid task.json."""
tasks_dir = trellis_dir / "tasks"
if not tasks_dir.is_dir():
return 0
count = 0
for d in tasks_dir.iterdir():
if d.is_dir() and d.name != "archive" and (d / "task.json").is_file():
count += 1
return count
def _get_developer(trellis_dir: Path) -> str:
content = _read_text(trellis_dir / ".developer")
if not content:
return "unknown"
for line in content.splitlines():
if line.startswith("name="):
return line[5:].strip()
return content.splitlines()[0].strip() or "unknown"
def _get_git_branch() -> str:
try:
result = subprocess.run(
["git", "branch", "--show-current"],
capture_output=True, text=True, timeout=3,
)
return result.stdout.strip() if result.returncode == 0 else ""
except (FileNotFoundError, subprocess.TimeoutExpired):
return ""
def _format_ctx_size(size: int) -> str:
if size >= 1_000_000:
return f"{size // 1_000_000}M"
if size >= 1_000:
return f"{size // 1_000}K"
return str(size)
def _format_duration(ms: int) -> str:
secs = ms // 1000
hours, remainder = divmod(secs, 3600)
mins = remainder // 60
if hours > 0:
return f"{hours}h{mins}m"
return f"{mins}m"
def main() -> None:
# Read Claude Code session JSON from stdin
try:
cc_data = json.loads(sys.stdin.read())
except (json.JSONDecodeError, ValueError):
cc_data = {}
trellis_dir = _find_trellis_dir()
SEP = " \033[90m·\033[0m "
# --- Trellis data ---
task = _get_current_task(trellis_dir) if trellis_dir else None
dev = _get_developer(trellis_dir) if trellis_dir else ""
task_count = _count_active_tasks(trellis_dir) if trellis_dir else 0
# --- CC session data ---
model = cc_data.get("model", {}).get("display_name", "?")
ctx_pct = int(cc_data.get("context_window", {}).get("used_percentage") or 0)
ctx_size = _format_ctx_size(cc_data.get("context_window", {}).get("context_window_size") or 0)
duration = _format_duration(cc_data.get("cost", {}).get("total_duration_ms") or 0)
branch = _get_git_branch()
# Avoid "Opus 4.6 (1M context) (1M)"
if re.search(r"\d+[KMG]\b", model, re.IGNORECASE):
model_label = model
else:
model_label = f"{model} ({ctx_size})"
# Context % with color
if ctx_pct >= 90:
ctx_color = "\033[31m"
elif ctx_pct >= 70:
ctx_color = "\033[33m"
else:
ctx_color = "\033[32m"
# Build info line: model · ctx · branch · duration · dev · tasks [· rate limits]
parts = [
model_label,
f"ctx {ctx_color}{ctx_pct}%\033[0m",
]
if branch:
parts.append(f"\033[35m{branch}\033[0m")
parts.append(duration)
if dev:
parts.append(f"\033[32m{dev}\033[0m")
if task_count:
parts.append(f"{task_count} task(s)")
five_hr = cc_data.get("rate_limits", {}).get("five_hour", {}).get("used_percentage")
if five_hr is not None:
parts.append(f"5h {int(five_hr)}%")
seven_day = cc_data.get("rate_limits", {}).get("seven_day", {}).get("used_percentage")
if seven_day is not None:
parts.append(f"7d {int(seven_day)}%")
info_line = SEP.join(parts)
# Output: task line (only if active) + info line
if task:
print(f"\033[36m[{task['priority']}]\033[0m {task['title']} \033[33m({task['status']})\033[0m")
print(info_line)
if __name__ == "__main__":
main()