Sources and Sinks
ToLO 检测建立在标准污点分析框架上:三个谓词集合(source、sink、sanitizer)加传播规则。本节给出三个集合的具体划分,可直接映射到 CodeQL DataFlow::Configuration 或 TaintTracking::Configuration。
这一页的重点是把”LLM 输出不可信”变成可审查的集合定义。不要把 source 写成”所有字符串”,也不要把 sanitizer 写成”任何校验函数”。ToLO 的有效性来自边界清晰:
- source 必须能追溯到 LLM 或 LLM-influenceable 数据。
- sink 必须能影响被保护对象。
- sanitizer 必须与 sink 语义匹配。
这一页的结构
- 先修概念:污点传播
- Source 集合
S_LLM(五子集)+ 每子集的代码模式 - Source 建模优先级
- Sink 集合
S_DANGER(七类)+ 复用 CodeQL 标准库 - Sanitizer 集合
C_SAFE(五类)+ 类型匹配判定 - 三个容易混淆的判断
- 告警解释模板
先修概念:污点传播
污点传播可以理解为给数据贴标签:
- 某个值来自不可信来源,就贴上
tainted标签。 - 只要它被赋值、拼接、放进对象、解析成字段,标签就跟着走。
- 如果它经过有效 sanitizer,标签才可能被移除。
最小例子:
content = ai_message.content # tainted (从 LLM 输出来)obj = json.loads(content) # obj 仍然 tainted (json.loads 不清洗)path = obj["path"] # path 仍然 tainted (dict 取字段)open(path) # tainted 值进入 sink → 报告json.loads 没有移除路径风险,所以污染继续传播。
下一节展示更多传播规则的细节:容器传播、字符串变换、框架分派 → 见 Predicate Rules。
Source 集合 S_LLM(五子集)
S_LLM^direct — 模型直接生成的文本
直接 LLM API 客户端的返回字段。
| 框架 | 字段路径 |
|---|---|
| OpenAI Python | response.choices[0].message.content |
| OpenAI Python | response.choices[0].delta.content(streaming) |
| OpenAI Python | response.output_text(新 Responses API) |
| Anthropic Python | response.content[i].text(text blocks) |
| Google GenAI | response.text / response.candidates[0].content.parts[0].text |
| Cohere | response.text |
| Mistral | response.choices[0].message.content |
| Ollama / OpenAI 兼容 | 同 OpenAI 字段 |
CodeQL 建模思路:把上述 SDK 的客户端方法返回值的 .content / .text 等属性访问标为 source。需要识别 SDK 类型(openai.OpenAI、anthropic.Anthropic、google.generativeai.GenerativeModel 等)。
S_LLM^framework — 编排框架包装后的对象字段
编排框架把模型输出包装成对象后的字段。
| 框架 | 字段路径 |
|---|---|
| LangChain | AIMessage.content |
| LangChain | AIMessage.tool_calls[i]["args"] |
| LangChain | AIMessage.additional_kwargs |
| LangChain | AgentAction.tool_input(dict or str) |
| LangChain | AgentFinish.return_values["output"] |
| LangChain | ChatGeneration.text |
| LlamaIndex | Response.response |
| LlamaIndex | ChatResponse.message.content |
| LlamaIndex | AgentChatResponse.response |
| Haystack | Answer.data / GeneratedAnswer.data |
| AutoGen | Agent 消息列表内 content |
| CrewAI | Task.output.raw / result |
| Semantic Kernel | FunctionResult.value |
| LiteLLM | 与 OpenAI 同接口 |
| MCP | tools/call 返回的 content[i].text |
CodeQL 建模思路:为每个框架的 message / generation / agent action / response 类建立 type model,然后把这些类的 content / output / args 属性访问标为 source。
S_LLM^parsed — 从前两类经字符串解析得到的派生字段
显式的 propagation step,因为它们通常需要 dataflow 标记中间转换:
data = json.loads(ai_message.content) # ← 显式传播obj = re.findall(r"...", ai_message.content)yaml_obj = yaml.safe_load(response.text)parsed = MyOutputParser.parse(ai_message.content)CodeQL 建模思路:在 TaintTracking::Configuration 中加 isAdditionalTaintStep 规则,把 json.loads(x)、yaml.safe_load(x)、re.search(x, y).group(...)、自定义 parser 调用都建成 x → return value 的污点传播。
S_LLM^structured — 结构化输出与 function call 的字段
Pydantic / dataclass 实例的所有字段:
| 框架 | 字段路径 |
|---|---|
| OpenAI structured output | client.beta.chat.completions.parse(response_format=X) → 实例字段 |
| Anthropic tool use | response.content[i].input (dict) |
LangChain with_structured_output | llm.with_structured_output(X).invoke(...) → 实例 |
| Instructor | client.chat.completions.create(response_model=X) → 实例 |
| Outlines | outlines.generate.json(model, X) → 实例 |
| Marvin | marvin.cast(text, target=X) → 实例 |
重点:实例的所有字段默认都是 tainted,即使 schema 限制了形状。详见 §“为什么 Pydantic 不构成 sanitizer”。
S_LLM^rag — RAG 检索器返回的文档内容字段
| 框架 | 字段路径 |
|---|---|
| LangChain | Document.page_content |
| LangChain | Document.metadata(也是 attacker-influenceable) |
| LlamaIndex | NodeWithScore.node.text |
| LlamaIndex | NodeWithScore.node.metadata |
| Haystack | Document.content / Document.meta |
| 任意 vector store | vector_store.similarity_search(...) 的返回值 |
攻击者可通过 RAG 投毒通道(C3)控制,与 S_LLM^direct 等价。
子集之间互相转化
五个子集可以互相转化:
S_LLM^rag 进入 prompt → 影响 S_LLM^directS_LLM^direct 被框架包装 → 变成 S_LLM^frameworkS_LLM^framework 经 parser → 变成 S_LLM^parsedS_LLM^direct 经 function calling → 变成 S_LLM^structured静态分析规则需要把这些转换当作传播步骤,而不是把它们误认为清洗。
┌──────────────┐ │ S_LLM^direct │ └──────┬───────┘ │ ┌─────────────┼─────────────┐ ▼ ▼ ▼ ┌──────────────┐ ┌──────────┐ ┌────────────────┐ │S_LLM^framework│ │S_LLM^parsed│ │S_LLM^structured│ └───────┬──────┘ └──────────┘ └────────────────┘ │ ▼ parser/dispatch (re-enter)Source 建模优先级
第一层应覆盖直接 SDK:OpenAI、Anthropic、Google、Cohere、Mistral、Ollama 等 client 的返回字段。这一层对象少、语义稳定,是跨框架 baseline。
第二层覆盖旗舰框架:LangChain Python、LangChain.js、LlamaIndex Python 等。它们的 message、generation、agent action、tool call 包装层复杂,但生态影响大。
第三层覆盖 long-tail 框架:Haystack、AutoGen、CrewAI、Semantic Kernel、LiteLLM、Guidance、DSPy、Marvin 等。可以先做浅层 source 标记,再通过公开案例和人工 review 加深。
五个 source 子集怎么记
| 子集 | 白话解释 | 例子 |
|---|---|---|
direct | 直接 SDK 返回 | OpenAI / Anthropic response 字段 |
framework | 框架包装对象 | AIMessage.content、AgentAction.tool_input |
parsed | 从输出解析出来 | json.loads(ai.content)["path"] |
structured | 结构化输出字段 | Pydantic model 的字段 |
rag | 检索返回内容 | Document.page_content、metadata |
不要被符号吓到。它们只是说明:LLM 相关的不可信数据不只在一个对象里,而是在多个框架层之间流动。
为什么 Pydantic 不构成 sanitizer
这一点经常被误解,单独说明。
Pydantic 只验证类型与结构(str 是 str、字段存在、长度在范围内),它不验证内容语义。一个被验证为合法 str 的字段,其字符串内容仍可包含任意攻击 payload。
判定规则
只有当字段使用 Literal[...] / Enum / Annotated[str, StringConstraints(pattern=...)] 等约束时,才视为 sanitizer 候选;单纯的 str 字段不构成 sanitizer。
这一判定偏向 recall —— 可能产生 false positive(应用做了自实现校验但我们没识别),但 false negative 的代价更高。
最小例子
class Action(BaseModel): path: str这个 model 只说明 path 是字符串。"../../.env" 也是字符串,所以它不会阻止路径风险。
更强一点:
class Action(BaseModel): action: Literal["summarize", "search"]这里 action 被限制在两个值内,才有可能成为 schema sanitizer 的一部分。
CodeQL 表达方式
predicate isSchemaConstrainedField(Field f) { // Literal[...] 注解 f.getAnnotation().hasName("Literal") or // Enum 类型 f.getType().getABaseType*().hasQualifiedName("enum", "Enum") or // Annotated[str, StringConstraints(pattern=...)] exists(Annotation a | a = f.getAnnotation() and a.hasName("Annotated") and a.hasArgument(_, "StringConstraints"))}Sink 集合 S_DANGER(七类)
对应 Core ToLO Patterns 的七子类。这一集合不需要 LLM-specific 扩展 —— 经典污点分析已穷尽。
直接复用 CodeQL 标准库:
| ToLO 子类 | CodeQL 复用 query/library |
|---|---|
ToLO-Exec | python/code-injection |
ToLO-Shell | python/command-line-injection |
ToLO-SQL | python/sql-injection |
ToLO-Path | python/path-injection |
ToLO-SSRF | python/server-side-request-forgery |
ToLO-Deser | python/unsafe-deserialization |
ToLO-Template | python/template-injection(部分需自建) |
JS / TS 等其他语言的 sink 集合复用同样的 CodeQL 标准库。
设计原则:Sink 复用
Sink 端不发明,把 novelty 完全压在 source 端。这样设计有几个收益:
- baseline 对照实验干净:默认 CodeQL 与 ToLOScanner 的差异完全来自 source 定义,而不是混杂着新 sink。
- 规则解释更容易:报告可以说”这是传统 code injection sink,但 source 是 LLM output”,而不是重新命名所有危险 API。这样既尊重已有 CWE,又突出 ToLO 的 source 端贡献。
- 维护成本低:CodeQL 标准库的 sink 规则一直在更新(新增 SDK、新增危险 API),ToLO 不需要重新发明这部分。
Sink 的现实样子
初学者可以把 sink 集合理解成”真实世界会被影响的操作”:
- 执行代码 (eval / exec / compile / PythonREPL)
- 跑命令 (subprocess shell=True / os.system / os.popen)
- 查数据库 (cursor.execute / pandas.read_sql / SQLAlchemy text)
- 读写文件 (open / Path.read|write / shutil)
- 发网络请求 (requests / httpx / urllib)
- 渲染模板 (Jinja2 Template / Environment.from_string)
- 恢复对象 (pickle.loads / yaml.load / torch.load)
这些操作本身不一定错;错在它们接收了未受控的 LLM 输出。
Sanitizer 集合 C_SAFE(五类)
C_SAFE^schema
Pydantic Strict 模式 + Literal / Enum / 受约束类型;JSON Schema 的 additionalProperties: false + 枚举或正则字段。
可识别代码模式:
# Pydantic v2class X(BaseModel): model_config = ConfigDict(strict=True, extra="forbid") action: Literal["a", "b"] name: Annotated[str, StringConstraints(pattern=r"^[a-z]+$")]
# JSON Schema{ "type": "object", "additionalProperties": False, "properties": { "action": {"type": "string", "enum": ["a", "b"]}, },}C_SAFE^allowlist
显式取值允许列表:
# Tool name allowlistif tool_name not in ALLOWED_TOOLS: raise
# Path root jail (先 resolve 再 is_relative_to)target = (ROOT / user_path).resolve()if not target.is_relative_to(ROOT): raise
# URL scheme + host allowlistu = urlparse(url)if u.scheme not in {"http", "https"}: raiseif u.hostname not in ALLOWED_HOSTS: raise
# SQL table mappingreal_table = ALLOWED_TABLES[llm_table]if llm_column not in ALLOWED_COLUMNS[llm_table]: raiseC_SAFE^parameterized
参数化下游调用:
# SQLcursor.execute("SELECT * FROM t WHERE id = %s", (uid,)) # ✓
# Shellsubprocess.run(["git", "log", branch], shell=False) # ✓
# HTTPrequests.get(BASE_URL, params={"q": q}) # ✓
# Templateenv = SandboxedEnvironment(autoescape=True)tpl = env.from_string(FIXED_TEMPLATE) # ← 模板字符串固定tpl.render(name=llm_value) # ✓C_SAFE^safe-codec
安全编解码:
json.loads(x) # 替代 pickle.loadsyaml.safe_load(x) # 替代 yaml.loadast.literal_eval(x) # 替代 eval (literal)numexpr.evaluate(x, global_dict={}, local_dict={...}) # 替代 eval (math)torch.load(path, weights_only=True) # 替代 torch.load (default)C_SAFE^capability
能力门控:执行 LLM 指定操作前检查是否在当前会话 capability 集合中(参考 IsolateGPT、CaMeL 的设计)。
# 会话级 capabilityclass Session: allowed_tools: set[str] allowed_paths: list[Path] allowed_hosts: set[str]
# 在 sink 前检查if tool_name not in session.allowed_tools: raise PermissionError()
# 容器化 sandboxdocker.containers.run(..., network_disabled=True, read_only=True, ...)
# 最小权限数据库账号RO_DB = create_engine("postgresql://readonly_user@db/app")主流框架几乎缺失 —— 这一类目前在检测端没有稳定信号可识别;defense 端建议引入,但 scanner 难以验证是否真的封住了什么。
Sanitizer 必须类型匹配
Sanitizer 不是越多越好,而是必须能切断相应 sink 的语义风险。
yaml.safe_load可以处理反序列化风险,但它解出的字符串仍可继续污染 SQL 或 shell。subprocess.run([...], shell=False)能降低 shell injection,但不能证明命令本身被授权。SandboxedEnvironment能限制模板执行能力,但如果模板字符串来自 LLM,仍需确认 sandbox 是否覆盖目标风险。
三个容易混淆的判断
| 代码行为 | 是否 sanitizer | 原因 |
|---|---|---|
json.loads(llm_text) | 通常不是 | 只解析结构,不限制字段语义 |
if tool not in ALLOWED: raise | 通常是 | 明确限制工具集合(C_SAFE^allowlist) |
print(llm_text) | 不是 sink | 只是展示,不是敏感操作 |
len(text) < 1000 | 不是 sanitizer | 长度对绝大多数 sink 无效 |
re.match(r"^[a-z]+$", x) | 可能是(允许字符严格) | 对 SQL 表名 / shell exec 名有效 |
text.replace("'", "''") | 错配,不算 sanitizer | SQL escape 在现代驱动里不安全,要用参数化 |
html.escape(text) | 只对 HTML XSS 是 sanitizer | 对 SQL / shell / path / URL 不算 |
ToLO 谓词(集合视图)
一个程序点对 (p_src, p_sink) 触发 ToLO,当且仅当存在污点路径 π: p_src ↝ p_sink,满足:
p_src ∈ S_LLM^{direct, framework, parsed, structured, rag} ANDp_sink ∈ ToLO-{Deser, Exec, Shell, SQL, Path, SSRF, Template} 对应 sink ANDπ 上不存在中间节点 q ∈ C_SAFE 对相应数据做了 类型匹配 的清洗“与 sink 类型匹配”是必要的 —— 对 SQL sink 用 html.escape 不算 sanitize;对路径 sink 用 SQL parameterization 不算 sanitize。
详细的传播规则、容器传播、字符串变换、跨函数 propagation 见 Predicate Rules。
告警解释模板
静态分析报告建议按四段展示:
┌──────────────────────────────────────────────────────┐│ ⚠ ToLO-Path 告警 │├──────────────────────────────────────────────────────┤│ Source: app/agent.py:42 ││ <S_LLM^framework> AIMessage.tool_calls[0]["args"] ││ ││ Path: ││ 42 → 43: args = json.loads(message.tool_calls[0]["args"]) ││ 43 → 44: path = args["path"] ││ 44 → 47: target = Path(path) ││ ││ Sink: app/agent.py:47 ││ <ToLO-Path> Path(...).read_text() ││ 对应被保护对象: 文件系统 ││ ││ Missing guard: ││ 无 C_SAFE^allowlist (路径未经 resolve + is_relative_to 检查) ││ 无 C_SAFE^capability (会话权限未检查) ││ ││ 推荐修复: ││ - 引入 Path((ROOT / path).resolve()).is_relative_to(ROOT) ││ - 或者把 path 改为 Literal[...] 文件 ID 映射 │└──────────────────────────────────────────────────────┘这个模板能避免报告变成单纯 API 列表,也方便和公开 CVE 案例互相校验。
读完检查
判断下面路径:
S_LLM^structured.name -> f"SELECT * FROM {name}" -> db.execute(...)如果 name 只是 Pydantic str 字段,没有表名 allowlist,这仍然是 ToLO-SQL 候选路径。结构化输出不是自动 sanitizer。
如果 name 是 Literal["users","orders"],这才是 C_SAFE^schema 加上隐式 allowlist —— 安全。
下一步阅读
- ToLO 谓词与传播规则:把上面集合写成数据流谓词,详细处理传播步骤。
- Query Design Notes:把规则规格落到 CodeQL / Semgrep 实现。