Query Design Notes
ToLO 查询设计的关键是把 LLM 框架 API 识别成 source,把传统危险 API 识别成 sink,并描述中间 sanitizer。
这里的”查询”不是一次性脚本,而是可维护的规格包。规则应当能:
- 随着框架 API 变化扩展 source model
- 同时尽量少改 sink model
- 让研究重点固定在 ToLO 的 source class,而不是陷入每个 sink 的重复实现
这一页的结构
- 先修概念:规则不是漏洞证明
- 第一版规则目标
- 查询设计顺序
- Semgrep 适合做什么 + 最小规则草稿
- CodeQL 适合做什么 + 最小规则草稿
- 告警分级(High / Medium / Review)
- 维护与扩展
先修概念:规则不是漏洞证明
静态分析规则只能告诉你”代码里可能存在一条危险路径”。它不等于:
- 完整 exploit
- CVE 编号
- 必然可被攻击的证据
ToLO 学习站里的规则设计只用于教育、检测和 triage,不提供针对真实服务的攻击步骤。
第一版规则目标
第一版不追求覆盖全部框架。更合理的目标是:用少量旗舰框架和公开 CVE 验证 source/sink/sanitizer 三元组是否能稳定表达。能在已知案例上解释清楚,再逐步扩展到 long-tail 生态。
具体目标:
| 目标 | 验证 CVE |
|---|---|
| 找到 LLM 输出进入 code execution sink 的路径 | CVE-2023-29374, CVE-2023-36258, CVE-2023-39631, CVE-2024-5826 |
| 找到 LLM 输出进入 query construction 的路径 | CVE-2024-8309, CVE-2024-5826 |
| 找到 agent tool 参数缺少 schema/allowlist 的路径 | 通用 |
| 找到 output parser 结果直接进入敏感操作 的路径 | 通用 |
学习者可以把第一版目标记成三句话:
先找清楚 source再复用成熟 sink最后只承认匹配 sanitizer查询设计顺序
- 先从少量明确 source 开始:chat completion content、parsed output、agent action。
- 再接入高置信 sink:
eval、shell execution、database query、template rendering。直接 import CodeQL 标准库的对应 sink。 - 然后补 guard 识别:allowlist、schema validation、sandbox boundary、explicit approval。
- 最后根据公开案例调低误报,补充框架特定 API。
每新增一个 source model,要写测试
附带至少一个最小测试样例:
✓ source 进入 sink → 应报✓ source 经类型匹配 sanitizer 后 → 不应报✓ 普通用户输入 / 内部字符串 → 不应被错误归入 ToLO测试样例不需要是真实漏洞,但要覆盖规则意图。
Semgrep 适合做什么
Semgrep 适合先写局部模式,快速发现明显的数据传递和危险 API 使用。
适合场景
- 查找
shell=True、eval、exec、yaml.load、pickle.loads - 查找
cursor.execute(f"...")这类拼接 SQL - 查找框架 tool 函数里缺少 allowlist 的明显模式
- 生成 review queue,让人工快速定位值得写 CodeQL model 的框架 API
不适合场景
只要路径跨过 parser、helper wrapper、agent registry 或 callback,Semgrep 就容易漏报或误报。报告中应把 Semgrep 结果标为初筛,不是完整检测结论。
最小 Semgrep 规则草稿
规则 1: eval 接近 AIMessage.content
rules: - id: tolo-exec-llm-eval-direct pattern-either: - pattern: eval($X.content) - pattern: exec($X.content) - pattern: compile($X.content, ...) message: | LLM output `.content` may flow to code execution sink. If $X is an AIMessage / ChatCompletion / similar LLM response, this is a likely ToLO-Exec. languages: [python] severity: WARNING metadata: tolo_sink: Exec tolo_subclass: ToLO-Exec cwe: CWE-94规则 2: tool_call.arguments 进 subprocess
rules: - id: tolo-shell-tool-call-args pattern-either: - pattern: | $ARGS = json.loads($CALL.function.arguments) ... subprocess.run($ARGS[...], shell=True, ...) - pattern: | $ARGS = json.loads($CALL.function.arguments) ... os.system($ARGS[...]) message: "OpenAI tool_call arguments may flow to shell." languages: [python] severity: ERROR metadata: tolo_subclass: ToLO-Shell cwe: CWE-78规则 3: @tool 函数体内 subprocess.run(..., shell=True)
rules: - id: tolo-shell-in-tool-decorator pattern: | @tool def $F(...): ... subprocess.run(..., shell=True, ...) ... message: | Function decorated with @tool contains subprocess(shell=True). LLM-controlled args may inject shell commands. languages: [python] severity: ERROR metadata: tolo_subclass: ToLO-ShellSemgrep 限制
- 不能跨函数追踪。如果 tool 函数内调用 helper,helper 里调 sink,Semgrep 看不到完整链路。
- 不能精确判断类型。
$X.content可能是任何含.content属性的对象。 - 适合作为 “问题清单生成器”,不适合作为最终判定。
CodeQL 适合做什么
CodeQL 更适合表达跨函数数据流、source-to-sink 路径和 sanitizer。
CodeQL 规则建议分模块
ToLO-Source/ ├── LLMClientSource.qll OpenAI / Anthropic / Google / Cohere SDK 直接返回字段 ├── FrameworkSource.qll LangChain / LlamaIndex / Haystack 等包装对象 source ├── ParsedOutputStep.qll JSON / YAML / regex / parser 派生传播 ├── StructuredOutputStep.qll Pydantic / dataclass / function calling 字段传播 └── RAGSource.qll Document.page_content 等
ToLO-Sink/ └── 复用 CodeQL 标准库 + 框架特定 wrapper
ToLO-Sanitizer/ ├── SchemaBarrier.qll ├── AllowlistBarrier.qll ├── ParameterizedBarrier.qll ├── SafeCodecBarrier.qll └── CapabilityBarrier.qll新增框架通常只需要扩展 source 模块,不会扰动 sink 与 sanitizer 判定。
最小 CodeQL 草稿
LangChainSource.qll
import pythonimport semmle.python.dataflow.new.DataFlow
/** AIMessage.content 等 LangChain 框架包装字段 */class LangChainMessageContent extends DataFlow::Node { LangChainMessageContent() { exists(Attribute a | a.getAttr() in ["content", "tool_calls", "additional_kwargs"] and a.getObject().pointsTo().getClass().getName() in [ "AIMessage", "BaseMessage", "ChatMessage", "HumanMessage", "ToolMessage", "AIMessageChunk" ] and this.asExpr() = a ) }}
/** LangChain AgentAction 字段 */class LangChainAgentAction extends DataFlow::Node { LangChainAgentAction() { exists(Attribute a | a.getAttr() in ["tool_input", "tool", "log"] and a.getObject().pointsTo().getClass().getName() in ["AgentAction"] and this.asExpr() = a ) }}ToLOExecConfiguration.qll
import pythonimport semmle.python.dataflow.new.TaintTrackingimport LangChainSource
class ToLOExecConfig extends TaintTracking::Configuration { ToLOExecConfig() { this = "ToLOExecConfig" }
override predicate isSource(DataFlow::Node n) { n instanceof LangChainMessageContent or n instanceof LangChainAgentAction or n instanceof OpenAIResponseContent // 另一个 source 模块 }
override predicate isSink(DataFlow::Node n) { // 复用 CodeQL 标准 code-injection sink n instanceof CodeExecutionSink }
override predicate isAdditionalTaintStep(DataFlow::Node src, DataFlow::Node tgt) { // json.loads(x) → return value 传播 exists(API::Node json | json = API::moduleImport("json").getMember("loads") and src = json.getACall().getArg(0) and tgt = json.getACall()) or // ast.literal_eval(x) 也算 exists(API::Node lit | lit = API::moduleImport("ast").getMember("literal_eval") and src = lit.getACall().getArg(0) and tgt = lit.getACall()) }
override predicate isBarrier(DataFlow::Node n) { // 经过 numexpr.evaluate(..., global_dict={}) 算 safe-codec exists(Call c | c.getFunc().(Attribute).getName() = "evaluate" and c.getFunc().(Attribute).getObject().(Name).getId() = "numexpr" and exists(c.getANamedArg("global_dict")) and n.asExpr() = c.getArg(0)) }}主查询
import pythonimport semmle.python.dataflow.new.DataFlow::PathGraphimport ToLOExecConfiguration
from ToLOExecConfig cfg, DataFlow::PathNode src, DataFlow::PathNode sinkwhere cfg.hasFlowPath(src, sink)select sink, src, sink, "LLM 输出经数据流到达代码执行 sink (ToLO-Exec)"CodeQL 输出的告警(示意)
[ToLOExec] LLM 输出经数据流到达代码执行 sink
Source: app/chains.py:15 answer = chain.invoke({"question": q}) # answer.content matched LangChainMessageContent
Path: app/chains.py:15 → app/chains.py:17 (assignment) app/chains.py:17 → app/utils.py:42 (function call) app/utils.py:42 → app/utils.py:43 (assignment after json.loads) app/utils.py:43 → app/runtime.py:89 (passed to eval)
Sink: app/runtime.py:89 return eval(code) # matched CodeExecutionSink (CWE-94)
Missing guard: No C_SAFE^safe-codec or C_SAFE^capability on this path这条路径才是 ToLO 教学和 triage 的核心。
告警分级
第一版可以把结果分成三档:
High(可能是真实问题)
- 已知
S_LLMsource 到代码执行、shell、反序列化、数据库写入或 SSRF sink - 且没有 guard
- 且 sink 是高危(
exec/os.system/pickle.loads等)
→ 直接 triage,优先看。
Medium(配置相关)
- 路径完整但 sink 风险依赖配置:
- 数据库账号权限
- sandbox 配置
- tool enablement(
allow_dangerous_codeflag) - 网络出口策略
→ 需要查应用配置才能定结论。
Review(模型不确定)
- source 或 sanitizer 模型不确定
- 框架是 long-tail,可能未精确建模
- sanitizer 是自定义校验函数,语义未确认
→ 需要人工确认框架语义。
这种分级比单一 true/false 更实用,也能避免把研究阶段的 survey 结果写成确定漏洞。
维护与扩展
当框架升级时
新版 SDK 经常改字段名(如 OpenAI v0 → v1 的 message 结构变化)。维护策略:
- 保留旧字段名作为兼容(source 集合越宽越好,不会漏)
- 加新字段名对应新 SDK
- 测试在新旧版本都跑
当 sink 标准库扩展时
CodeQL 标准库会持续扩展 sink:
- 新 SDK 的危险方法
- 新发现的反射攻击面
- 新模板引擎
ToLO 规则不需要重写,自动受益于标准库更新。这是”sink 复用”设计的好处。
当发现新 ToLO 子类时
如果未来发现新 sink 语义(例如 LLM 输出影响 OS 资源限制、IPC、RPC 等),按以下步骤扩展:
- 验证它不能被现有七子类自然覆盖。
- 写至少一个公开 CVE 验证。
- 加
ToLO-X命名 + 对应 CWE。 - 加
C_SAFE类型匹配规则(可能新增一类 sanitizer)。
一个完整示例:用规则命中 CVE-2023-29374
目标
写规则命中 LLMMathChain 的 ToLO 路径。
Semgrep 草稿
rules: - id: tolo-exec-llmmathchain pattern-either: - pattern: | $CHAIN = LLMMathChain.from_llm(...) - pattern: | PythonREPL().run($X) - pattern: | $REPL = PythonREPL(...) ... $REPL.run($X) message: | LangChain LLMMathChain or PythonREPL path detected. LLM output may flow to exec(). See CVE-2023-29374. languages: [python] severity: ERROR metadata: tolo_subclass: ToLO-Exec reference_cve: CVE-2023-29374CodeQL 草稿
class LLMMathChainSink extends DataFlow::Node { LLMMathChainSink() { exists(Call c | c.getFunc().(Attribute).getName() = "run" and c.getFunc().(Attribute).getObject().pointsTo().getClass().getName() in ["PythonREPL", "PythonAstREPL"] and this.asExpr() = c.getArg(0)) }}
// 然后加入 ToLOExecConfig.isSink测试样例
# expected: REPORTfrom langchain.chains import LLMMathChainchain = LLMMathChain.from_llm(llm=ChatOpenAI())chain.run("compute 2024**2")# → 内部 PythonREPL.run(expr) → 命中# expected: NOT REPORT (修复后)from langchain.chains import LLMMathChainimport numexpr# 修复后 chain 内部用 numexpr.evaluate(expr, global_dict={})# isBarrier 识别 numexpr.evaluate 调用读完检查
判断下面两条规则哪条更接近 ToLOScanner:
- A. 找所有
eval(...)调用 - B. 找
AIMessage.content经过传播后进入eval(...),且中间没有 sandbox / capability
B 更接近。A 是传统危险 API 搜索,不能说明 source 是 LLM 输出。
下一步阅读
- Sources and Sinks:规则需要的 source / sink / sanitizer 集合定义。
- Predicate Rules:传播规则、
DataFlowvsTaintTracking取舍。 - Public Case Studies:用 5 个 CVE 验证你的规则草稿。