Skip to content

Query Design Notes

ToLO 查询设计的关键是把 LLM 框架 API 识别成 source,把传统危险 API 识别成 sink,并描述中间 sanitizer

这里的”查询”不是一次性脚本,而是可维护的规格包。规则应当能:

  • 随着框架 API 变化扩展 source model
  • 同时尽量少改 sink model
  • 让研究重点固定在 ToLO 的 source class,而不是陷入每个 sink 的重复实现

这一页的结构

  1. 先修概念:规则不是漏洞证明
  2. 第一版规则目标
  3. 查询设计顺序
  4. Semgrep 适合做什么 + 最小规则草稿
  5. CodeQL 适合做什么 + 最小规则草稿
  6. 告警分级(High / Medium / Review)
  7. 维护与扩展

先修概念:规则不是漏洞证明

静态分析规则只能告诉你”代码里可能存在一条危险路径”。它不等于:

  • 完整 exploit
  • CVE 编号
  • 必然可被攻击的证据

ToLO 学习站里的规则设计只用于教育、检测和 triage,不提供针对真实服务的攻击步骤

第一版规则目标

第一版不追求覆盖全部框架。更合理的目标是:用少量旗舰框架和公开 CVE 验证 source/sink/sanitizer 三元组是否能稳定表达。能在已知案例上解释清楚,再逐步扩展到 long-tail 生态。

具体目标:

目标验证 CVE
找到 LLM 输出进入 code execution sink 的路径CVE-2023-29374, CVE-2023-36258, CVE-2023-39631, CVE-2024-5826
找到 LLM 输出进入 query construction 的路径CVE-2024-8309, CVE-2024-5826
找到 agent tool 参数缺少 schema/allowlist 的路径通用
找到 output parser 结果直接进入敏感操作 的路径通用

学习者可以把第一版目标记成三句话:

先找清楚 source
再复用成熟 sink
最后只承认匹配 sanitizer

查询设计顺序

  1. 先从少量明确 source 开始:chat completion content、parsed output、agent action。
  2. 再接入高置信 sink:eval、shell execution、database query、template rendering。直接 import CodeQL 标准库的对应 sink。
  3. 然后补 guard 识别:allowlist、schema validation、sandbox boundary、explicit approval。
  4. 最后根据公开案例调低误报,补充框架特定 API。

每新增一个 source model,要写测试

附带至少一个最小测试样例:

✓ source 进入 sink → 应报
✓ source 经类型匹配 sanitizer 后 → 不应报
✓ 普通用户输入 / 内部字符串 → 不应被错误归入 ToLO

测试样例不需要是真实漏洞,但要覆盖规则意图。

Semgrep 适合做什么

Semgrep 适合先写局部模式,快速发现明显的数据传递和危险 API 使用。

适合场景

  • 查找 shell=Trueevalexecyaml.loadpickle.loads
  • 查找 cursor.execute(f"...") 这类拼接 SQL
  • 查找框架 tool 函数里缺少 allowlist 的明显模式
  • 生成 review queue,让人工快速定位值得写 CodeQL model 的框架 API

不适合场景

只要路径跨过 parser、helper wrapper、agent registry 或 callback,Semgrep 就容易漏报或误报。报告中应把 Semgrep 结果标为初筛,不是完整检测结论。

最小 Semgrep 规则草稿

规则 1: eval 接近 AIMessage.content

rules:
- id: tolo-exec-llm-eval-direct
pattern-either:
- pattern: eval($X.content)
- pattern: exec($X.content)
- pattern: compile($X.content, ...)
message: |
LLM output `.content` may flow to code execution sink.
If $X is an AIMessage / ChatCompletion / similar LLM response,
this is a likely ToLO-Exec.
languages: [python]
severity: WARNING
metadata:
tolo_sink: Exec
tolo_subclass: ToLO-Exec
cwe: CWE-94

规则 2: tool_call.arguments 进 subprocess

rules:
- id: tolo-shell-tool-call-args
pattern-either:
- pattern: |
$ARGS = json.loads($CALL.function.arguments)
...
subprocess.run($ARGS[...], shell=True, ...)
- pattern: |
$ARGS = json.loads($CALL.function.arguments)
...
os.system($ARGS[...])
message: "OpenAI tool_call arguments may flow to shell."
languages: [python]
severity: ERROR
metadata:
tolo_subclass: ToLO-Shell
cwe: CWE-78

规则 3: @tool 函数体内 subprocess.run(..., shell=True)

rules:
- id: tolo-shell-in-tool-decorator
pattern: |
@tool
def $F(...):
...
subprocess.run(..., shell=True, ...)
...
message: |
Function decorated with @tool contains subprocess(shell=True).
LLM-controlled args may inject shell commands.
languages: [python]
severity: ERROR
metadata:
tolo_subclass: ToLO-Shell

Semgrep 限制

  • 不能跨函数追踪。如果 tool 函数内调用 helper,helper 里调 sink,Semgrep 看不到完整链路。
  • 不能精确判断类型$X.content 可能是任何含 .content 属性的对象。
  • 适合作为 “问题清单生成器”,不适合作为最终判定。

CodeQL 适合做什么

CodeQL 更适合表达跨函数数据流、source-to-sink 路径和 sanitizer

CodeQL 规则建议分模块

ToLO-Source/
├── LLMClientSource.qll OpenAI / Anthropic / Google / Cohere SDK 直接返回字段
├── FrameworkSource.qll LangChain / LlamaIndex / Haystack 等包装对象 source
├── ParsedOutputStep.qll JSON / YAML / regex / parser 派生传播
├── StructuredOutputStep.qll Pydantic / dataclass / function calling 字段传播
└── RAGSource.qll Document.page_content 等
ToLO-Sink/
└── 复用 CodeQL 标准库 + 框架特定 wrapper
ToLO-Sanitizer/
├── SchemaBarrier.qll
├── AllowlistBarrier.qll
├── ParameterizedBarrier.qll
├── SafeCodecBarrier.qll
└── CapabilityBarrier.qll

新增框架通常只需要扩展 source 模块,不会扰动 sink 与 sanitizer 判定。

最小 CodeQL 草稿

LangChainSource.qll

import python
import semmle.python.dataflow.new.DataFlow
/** AIMessage.content 等 LangChain 框架包装字段 */
class LangChainMessageContent extends DataFlow::Node {
LangChainMessageContent() {
exists(Attribute a |
a.getAttr() in ["content", "tool_calls", "additional_kwargs"] and
a.getObject().pointsTo().getClass().getName() in [
"AIMessage", "BaseMessage", "ChatMessage",
"HumanMessage", "ToolMessage", "AIMessageChunk"
] and
this.asExpr() = a
)
}
}
/** LangChain AgentAction 字段 */
class LangChainAgentAction extends DataFlow::Node {
LangChainAgentAction() {
exists(Attribute a |
a.getAttr() in ["tool_input", "tool", "log"] and
a.getObject().pointsTo().getClass().getName() in ["AgentAction"] and
this.asExpr() = a
)
}
}

ToLOExecConfiguration.qll

import python
import semmle.python.dataflow.new.TaintTracking
import LangChainSource
class ToLOExecConfig extends TaintTracking::Configuration {
ToLOExecConfig() { this = "ToLOExecConfig" }
override predicate isSource(DataFlow::Node n) {
n instanceof LangChainMessageContent or
n instanceof LangChainAgentAction or
n instanceof OpenAIResponseContent // 另一个 source 模块
}
override predicate isSink(DataFlow::Node n) {
// 复用 CodeQL 标准 code-injection sink
n instanceof CodeExecutionSink
}
override predicate isAdditionalTaintStep(DataFlow::Node src, DataFlow::Node tgt) {
// json.loads(x) → return value 传播
exists(API::Node json |
json = API::moduleImport("json").getMember("loads") and
src = json.getACall().getArg(0) and
tgt = json.getACall())
or
// ast.literal_eval(x) 也算
exists(API::Node lit |
lit = API::moduleImport("ast").getMember("literal_eval") and
src = lit.getACall().getArg(0) and
tgt = lit.getACall())
}
override predicate isBarrier(DataFlow::Node n) {
// 经过 numexpr.evaluate(..., global_dict={}) 算 safe-codec
exists(Call c |
c.getFunc().(Attribute).getName() = "evaluate" and
c.getFunc().(Attribute).getObject().(Name).getId() = "numexpr" and
exists(c.getANamedArg("global_dict")) and
n.asExpr() = c.getArg(0))
}
}

主查询

import python
import semmle.python.dataflow.new.DataFlow::PathGraph
import ToLOExecConfiguration
from ToLOExecConfig cfg, DataFlow::PathNode src, DataFlow::PathNode sink
where cfg.hasFlowPath(src, sink)
select sink, src, sink, "LLM 输出经数据流到达代码执行 sink (ToLO-Exec)"

CodeQL 输出的告警(示意)

[ToLOExec] LLM 输出经数据流到达代码执行 sink
Source:
app/chains.py:15
answer = chain.invoke({"question": q})
# answer.content matched LangChainMessageContent
Path:
app/chains.py:15 → app/chains.py:17 (assignment)
app/chains.py:17 → app/utils.py:42 (function call)
app/utils.py:42 → app/utils.py:43 (assignment after json.loads)
app/utils.py:43 → app/runtime.py:89 (passed to eval)
Sink:
app/runtime.py:89
return eval(code)
# matched CodeExecutionSink (CWE-94)
Missing guard:
No C_SAFE^safe-codec or C_SAFE^capability on this path

这条路径才是 ToLO 教学和 triage 的核心

告警分级

第一版可以把结果分成三档:

High(可能是真实问题)

  • 已知 S_LLM source 到代码执行、shell、反序列化、数据库写入或 SSRF sink
  • 且没有 guard
  • 且 sink 是高危(exec / os.system / pickle.loads 等)

→ 直接 triage,优先看。

Medium(配置相关)

  • 路径完整但 sink 风险依赖配置:
    • 数据库账号权限
    • sandbox 配置
    • tool enablement(allow_dangerous_code flag)
    • 网络出口策略

→ 需要查应用配置才能定结论。

Review(模型不确定)

  • source 或 sanitizer 模型不确定
  • 框架是 long-tail,可能未精确建模
  • sanitizer 是自定义校验函数,语义未确认

→ 需要人工确认框架语义。

这种分级比单一 true/false 更实用,也能避免把研究阶段的 survey 结果写成确定漏洞。

维护与扩展

当框架升级时

新版 SDK 经常改字段名(如 OpenAI v0 → v1 的 message 结构变化)。维护策略:

  • 保留旧字段名作为兼容(source 集合越宽越好,不会漏)
  • 加新字段名对应新 SDK
  • 测试在新旧版本都跑

当 sink 标准库扩展时

CodeQL 标准库会持续扩展 sink:

  • 新 SDK 的危险方法
  • 新发现的反射攻击面
  • 新模板引擎

ToLO 规则不需要重写,自动受益于标准库更新。这是”sink 复用”设计的好处。

当发现新 ToLO 子类时

如果未来发现新 sink 语义(例如 LLM 输出影响 OS 资源限制、IPC、RPC 等),按以下步骤扩展:

  1. 验证它不能被现有七子类自然覆盖。
  2. 写至少一个公开 CVE 验证。
  3. ToLO-X 命名 + 对应 CWE。
  4. C_SAFE 类型匹配规则(可能新增一类 sanitizer)。

一个完整示例:用规则命中 CVE-2023-29374

目标

写规则命中 LLMMathChain 的 ToLO 路径。

Semgrep 草稿

rules:
- id: tolo-exec-llmmathchain
pattern-either:
- pattern: |
$CHAIN = LLMMathChain.from_llm(...)
- pattern: |
PythonREPL().run($X)
- pattern: |
$REPL = PythonREPL(...)
...
$REPL.run($X)
message: |
LangChain LLMMathChain or PythonREPL path detected.
LLM output may flow to exec(). See CVE-2023-29374.
languages: [python]
severity: ERROR
metadata:
tolo_subclass: ToLO-Exec
reference_cve: CVE-2023-29374

CodeQL 草稿

class LLMMathChainSink extends DataFlow::Node {
LLMMathChainSink() {
exists(Call c |
c.getFunc().(Attribute).getName() = "run" and
c.getFunc().(Attribute).getObject().pointsTo().getClass().getName()
in ["PythonREPL", "PythonAstREPL"] and
this.asExpr() = c.getArg(0))
}
}
// 然后加入 ToLOExecConfig.isSink

测试样例

# expected: REPORT
from langchain.chains import LLMMathChain
chain = LLMMathChain.from_llm(llm=ChatOpenAI())
chain.run("compute 2024**2")
# → 内部 PythonREPL.run(expr) → 命中
# expected: NOT REPORT (修复后)
from langchain.chains import LLMMathChain
import numexpr
# 修复后 chain 内部用 numexpr.evaluate(expr, global_dict={})
# isBarrier 识别 numexpr.evaluate 调用

读完检查

判断下面两条规则哪条更接近 ToLOScanner:

  • A. 找所有 eval(...) 调用
  • B. 找 AIMessage.content 经过传播后进入 eval(...),且中间没有 sandbox / capability

B 更接近。A 是传统危险 API 搜索,不能说明 source 是 LLM 输出。

下一步阅读