Skip to content

Sources and Sinks

ToLO 检测建立在标准污点分析框架上:三个谓词集合(source、sink、sanitizer)加传播规则。本节给出三个集合的具体划分,可直接映射到 CodeQL DataFlow::ConfigurationTaintTracking::Configuration

这一页的重点是把”LLM 输出不可信”变成可审查的集合定义。不要把 source 写成”所有字符串”,也不要把 sanitizer 写成”任何校验函数”。ToLO 的有效性来自边界清晰:

  • source 必须能追溯到 LLM 或 LLM-influenceable 数据。
  • sink 必须能影响被保护对象。
  • sanitizer 必须与 sink 语义匹配。

这一页的结构

  1. 先修概念:污点传播
  2. Source 集合 S_LLM(五子集)+ 每子集的代码模式
  3. Source 建模优先级
  4. Sink 集合 S_DANGER(七类)+ 复用 CodeQL 标准库
  5. Sanitizer 集合 C_SAFE(五类)+ 类型匹配判定
  6. 三个容易混淆的判断
  7. 告警解释模板

先修概念:污点传播

污点传播可以理解为给数据贴标签:

  • 某个值来自不可信来源,就贴上 tainted 标签。
  • 只要它被赋值、拼接、放进对象、解析成字段,标签就跟着走
  • 如果它经过有效 sanitizer,标签才可能被移除

最小例子:

content = ai_message.content # tainted (从 LLM 输出来)
obj = json.loads(content) # obj 仍然 tainted (json.loads 不清洗)
path = obj["path"] # path 仍然 tainted (dict 取字段)
open(path) # tainted 值进入 sink → 报告

json.loads 没有移除路径风险,所以污染继续传播

下一节展示更多传播规则的细节:容器传播、字符串变换、框架分派 → 见 Predicate Rules

Source 集合 S_LLM(五子集)

S_LLM^direct — 模型直接生成的文本

直接 LLM API 客户端的返回字段

框架字段路径
OpenAI Pythonresponse.choices[0].message.content
OpenAI Pythonresponse.choices[0].delta.content(streaming)
OpenAI Pythonresponse.output_text(新 Responses API)
Anthropic Pythonresponse.content[i].text(text blocks)
Google GenAIresponse.text / response.candidates[0].content.parts[0].text
Cohereresponse.text
Mistralresponse.choices[0].message.content
Ollama / OpenAI 兼容同 OpenAI 字段

CodeQL 建模思路:把上述 SDK 的客户端方法返回值的 .content / .text 等属性访问标为 source。需要识别 SDK 类型(openai.OpenAIanthropic.Anthropicgoogle.generativeai.GenerativeModel 等)。

S_LLM^framework — 编排框架包装后的对象字段

编排框架把模型输出包装成对象后的字段

框架字段路径
LangChainAIMessage.content
LangChainAIMessage.tool_calls[i]["args"]
LangChainAIMessage.additional_kwargs
LangChainAgentAction.tool_input(dict or str)
LangChainAgentFinish.return_values["output"]
LangChainChatGeneration.text
LlamaIndexResponse.response
LlamaIndexChatResponse.message.content
LlamaIndexAgentChatResponse.response
HaystackAnswer.data / GeneratedAnswer.data
AutoGenAgent 消息列表内 content
CrewAITask.output.raw / result
Semantic KernelFunctionResult.value
LiteLLM与 OpenAI 同接口
MCPtools/call 返回的 content[i].text

CodeQL 建模思路:为每个框架的 message / generation / agent action / response 类建立 type model,然后把这些类的 content / output / args 属性访问标为 source。

S_LLM^parsed — 从前两类经字符串解析得到的派生字段

显式的 propagation step,因为它们通常需要 dataflow 标记中间转换:

data = json.loads(ai_message.content) # ← 显式传播
obj = re.findall(r"...", ai_message.content)
yaml_obj = yaml.safe_load(response.text)
parsed = MyOutputParser.parse(ai_message.content)

CodeQL 建模思路:在 TaintTracking::Configuration 中加 isAdditionalTaintStep 规则,把 json.loads(x)yaml.safe_load(x)re.search(x, y).group(...)、自定义 parser 调用都建成 x → return value 的污点传播。

S_LLM^structured — 结构化输出与 function call 的字段

Pydantic / dataclass 实例的所有字段:

框架字段路径
OpenAI structured outputclient.beta.chat.completions.parse(response_format=X) → 实例字段
Anthropic tool useresponse.content[i].input (dict)
LangChain with_structured_outputllm.with_structured_output(X).invoke(...) → 实例
Instructorclient.chat.completions.create(response_model=X) → 实例
Outlinesoutlines.generate.json(model, X) → 实例
Marvinmarvin.cast(text, target=X) → 实例

重点:实例的所有字段默认都是 tainted,即使 schema 限制了形状。详见 §“为什么 Pydantic 不构成 sanitizer”。

S_LLM^rag — RAG 检索器返回的文档内容字段

框架字段路径
LangChainDocument.page_content
LangChainDocument.metadata(也是 attacker-influenceable)
LlamaIndexNodeWithScore.node.text
LlamaIndexNodeWithScore.node.metadata
HaystackDocument.content / Document.meta
任意 vector storevector_store.similarity_search(...) 的返回值

攻击者可通过 RAG 投毒通道(C3)控制,S_LLM^direct 等价

子集之间互相转化

五个子集可以互相转化:

S_LLM^rag 进入 prompt → 影响 S_LLM^direct
S_LLM^direct 被框架包装 → 变成 S_LLM^framework
S_LLM^framework 经 parser → 变成 S_LLM^parsed
S_LLM^direct 经 function calling → 变成 S_LLM^structured

静态分析规则需要把这些转换当作传播步骤,而不是把它们误认为清洗

┌──────────────┐
│ S_LLM^direct │
└──────┬───────┘
┌─────────────┼─────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────┐ ┌────────────────┐
│S_LLM^framework│ │S_LLM^parsed│ │S_LLM^structured│
└───────┬──────┘ └──────────┘ └────────────────┘
parser/dispatch (re-enter)

Source 建模优先级

第一层应覆盖直接 SDK:OpenAI、Anthropic、Google、Cohere、Mistral、Ollama 等 client 的返回字段。这一层对象少、语义稳定,是跨框架 baseline。

第二层覆盖旗舰框架:LangChain Python、LangChain.js、LlamaIndex Python 等。它们的 message、generation、agent action、tool call 包装层复杂,但生态影响大。

第三层覆盖 long-tail 框架:Haystack、AutoGen、CrewAI、Semantic Kernel、LiteLLM、Guidance、DSPy、Marvin 等。可以先做浅层 source 标记,再通过公开案例和人工 review 加深。

五个 source 子集怎么记

子集白话解释例子
direct直接 SDK 返回OpenAI / Anthropic response 字段
framework框架包装对象AIMessage.contentAgentAction.tool_input
parsed从输出解析出来json.loads(ai.content)["path"]
structured结构化输出字段Pydantic model 的字段
rag检索返回内容Document.page_contentmetadata

不要被符号吓到。它们只是说明:LLM 相关的不可信数据不只在一个对象里,而是在多个框架层之间流动

为什么 Pydantic 不构成 sanitizer

这一点经常被误解,单独说明

Pydantic 只验证类型结构(strstr、字段存在、长度在范围内),它不验证内容语义。一个被验证为合法 str 的字段,其字符串内容仍可包含任意攻击 payload。

判定规则

只有当字段使用 Literal[...] / Enum / Annotated[str, StringConstraints(pattern=...)] 等约束时,才视为 sanitizer 候选;单纯的 str 字段不构成 sanitizer

这一判定偏向 recall —— 可能产生 false positive(应用做了自实现校验但我们没识别),但 false negative 的代价更高。

最小例子

class Action(BaseModel):
path: str

这个 model 只说明 path 是字符串。"../../.env" 也是字符串,所以它不会阻止路径风险

更强一点:

class Action(BaseModel):
action: Literal["summarize", "search"]

这里 action 被限制在两个值内,才有可能成为 schema sanitizer 的一部分。

CodeQL 表达方式

predicate isSchemaConstrainedField(Field f) {
// Literal[...] 注解
f.getAnnotation().hasName("Literal") or
// Enum 类型
f.getType().getABaseType*().hasQualifiedName("enum", "Enum") or
// Annotated[str, StringConstraints(pattern=...)]
exists(Annotation a |
a = f.getAnnotation() and
a.hasName("Annotated") and
a.hasArgument(_, "StringConstraints"))
}

Sink 集合 S_DANGER(七类)

对应 Core ToLO Patterns 的七子类。这一集合不需要 LLM-specific 扩展 —— 经典污点分析已穷尽。

直接复用 CodeQL 标准库:

ToLO 子类CodeQL 复用 query/library
ToLO-Execpython/code-injection
ToLO-Shellpython/command-line-injection
ToLO-SQLpython/sql-injection
ToLO-Pathpython/path-injection
ToLO-SSRFpython/server-side-request-forgery
ToLO-Deserpython/unsafe-deserialization
ToLO-Templatepython/template-injection(部分需自建)

JS / TS 等其他语言的 sink 集合复用同样的 CodeQL 标准库。

设计原则:Sink 复用

Sink 端不发明,把 novelty 完全压在 source 端。这样设计有几个收益:

  1. baseline 对照实验干净:默认 CodeQL 与 ToLOScanner 的差异完全来自 source 定义,而不是混杂着新 sink。
  2. 规则解释更容易:报告可以说”这是传统 code injection sink,但 source 是 LLM output”,而不是重新命名所有危险 API。这样既尊重已有 CWE,又突出 ToLO 的 source 端贡献
  3. 维护成本低:CodeQL 标准库的 sink 规则一直在更新(新增 SDK、新增危险 API),ToLO 不需要重新发明这部分。

Sink 的现实样子

初学者可以把 sink 集合理解成”真实世界会被影响的操作”:

  • 执行代码 (eval / exec / compile / PythonREPL)
  • 跑命令 (subprocess shell=True / os.system / os.popen)
  • 查数据库 (cursor.execute / pandas.read_sql / SQLAlchemy text)
  • 读写文件 (open / Path.read|write / shutil)
  • 发网络请求 (requests / httpx / urllib)
  • 渲染模板 (Jinja2 Template / Environment.from_string)
  • 恢复对象 (pickle.loads / yaml.load / torch.load)

这些操作本身不一定错;错在它们接收了未受控的 LLM 输出

Sanitizer 集合 C_SAFE(五类)

C_SAFE^schema

Pydantic Strict 模式 + Literal / Enum / 受约束类型;JSON Schema 的 additionalProperties: false + 枚举或正则字段

可识别代码模式:

# Pydantic v2
class X(BaseModel):
model_config = ConfigDict(strict=True, extra="forbid")
action: Literal["a", "b"]
name: Annotated[str, StringConstraints(pattern=r"^[a-z]+$")]
# JSON Schema
{
"type": "object",
"additionalProperties": False,
"properties": {
"action": {"type": "string", "enum": ["a", "b"]},
},
}

C_SAFE^allowlist

显式取值允许列表:

# Tool name allowlist
if tool_name not in ALLOWED_TOOLS:
raise
# Path root jail (先 resolve 再 is_relative_to)
target = (ROOT / user_path).resolve()
if not target.is_relative_to(ROOT):
raise
# URL scheme + host allowlist
u = urlparse(url)
if u.scheme not in {"http", "https"}:
raise
if u.hostname not in ALLOWED_HOSTS:
raise
# SQL table mapping
real_table = ALLOWED_TABLES[llm_table]
if llm_column not in ALLOWED_COLUMNS[llm_table]:
raise

C_SAFE^parameterized

参数化下游调用:

# SQL
cursor.execute("SELECT * FROM t WHERE id = %s", (uid,)) # ✓
# Shell
subprocess.run(["git", "log", branch], shell=False) # ✓
# HTTP
requests.get(BASE_URL, params={"q": q}) # ✓
# Template
env = SandboxedEnvironment(autoescape=True)
tpl = env.from_string(FIXED_TEMPLATE) # ← 模板字符串固定
tpl.render(name=llm_value) # ✓

C_SAFE^safe-codec

安全编解码:

json.loads(x) # 替代 pickle.loads
yaml.safe_load(x) # 替代 yaml.load
ast.literal_eval(x) # 替代 eval (literal)
numexpr.evaluate(x, global_dict={}, local_dict={...}) # 替代 eval (math)
torch.load(path, weights_only=True) # 替代 torch.load (default)

C_SAFE^capability

能力门控:执行 LLM 指定操作前检查是否在当前会话 capability 集合中(参考 IsolateGPT、CaMeL 的设计)。

# 会话级 capability
class Session:
allowed_tools: set[str]
allowed_paths: list[Path]
allowed_hosts: set[str]
# 在 sink 前检查
if tool_name not in session.allowed_tools:
raise PermissionError()
# 容器化 sandbox
docker.containers.run(..., network_disabled=True, read_only=True, ...)
# 最小权限数据库账号
RO_DB = create_engine("postgresql://readonly_user@db/app")

主流框架几乎缺失 —— 这一类目前在检测端没有稳定信号可识别;defense 端建议引入,但 scanner 难以验证是否真的封住了什么。

Sanitizer 必须类型匹配

Sanitizer 不是越多越好,而是必须能切断相应 sink 的语义风险

  • yaml.safe_load 可以处理反序列化风险,但它解出的字符串仍可继续污染 SQL 或 shell
  • subprocess.run([...], shell=False) 能降低 shell injection,但不能证明命令本身被授权
  • SandboxedEnvironment 能限制模板执行能力,但如果模板字符串来自 LLM,仍需确认 sandbox 是否覆盖目标风险

三个容易混淆的判断

代码行为是否 sanitizer原因
json.loads(llm_text)通常不是只解析结构,不限制字段语义
if tool not in ALLOWED: raise通常是明确限制工具集合(C_SAFE^allowlist)
print(llm_text)不是 sink只是展示,不是敏感操作
len(text) < 1000不是 sanitizer长度对绝大多数 sink 无效
re.match(r"^[a-z]+$", x)可能是(允许字符严格)对 SQL 表名 / shell exec 名有效
text.replace("'", "''")错配,不算 sanitizerSQL escape 在现代驱动里不安全,要用参数化
html.escape(text)只对 HTML XSS 是 sanitizer对 SQL / shell / path / URL 不算

ToLO 谓词(集合视图)

一个程序点对 (p_src, p_sink) 触发 ToLO,当且仅当存在污点路径 π: p_src ↝ p_sink,满足:

p_src ∈ S_LLM^{direct, framework, parsed, structured, rag} AND
p_sink ∈ ToLO-{Deser, Exec, Shell, SQL, Path, SSRF, Template} 对应 sink AND
π 上不存在中间节点 q ∈ C_SAFE 对相应数据做了 类型匹配 的清洗

“与 sink 类型匹配”是必要的 —— 对 SQL sink 用 html.escape 不算 sanitize;对路径 sink 用 SQL parameterization 不算 sanitize。

详细的传播规则、容器传播、字符串变换、跨函数 propagation 见 Predicate Rules

告警解释模板

静态分析报告建议按四段展示:

┌──────────────────────────────────────────────────────┐
│ ⚠ ToLO-Path 告警 │
├──────────────────────────────────────────────────────┤
│ Source: app/agent.py:42 │
│ <S_LLM^framework> AIMessage.tool_calls[0]["args"] │
│ │
│ Path: │
│ 42 → 43: args = json.loads(message.tool_calls[0]["args"]) │
│ 43 → 44: path = args["path"] │
│ 44 → 47: target = Path(path) │
│ │
│ Sink: app/agent.py:47 │
│ <ToLO-Path> Path(...).read_text() │
│ 对应被保护对象: 文件系统 │
│ │
│ Missing guard: │
│ 无 C_SAFE^allowlist (路径未经 resolve + is_relative_to 检查) │
│ 无 C_SAFE^capability (会话权限未检查) │
│ │
│ 推荐修复: │
│ - 引入 Path((ROOT / path).resolve()).is_relative_to(ROOT) │
│ - 或者把 path 改为 Literal[...] 文件 ID 映射 │
└──────────────────────────────────────────────────────┘

这个模板能避免报告变成单纯 API 列表,也方便和公开 CVE 案例互相校验。

读完检查

判断下面路径:

S_LLM^structured.name -> f"SELECT * FROM {name}" -> db.execute(...)

如果 name 只是 Pydantic str 字段,没有表名 allowlist,这仍然是 ToLO-SQL 候选路径。结构化输出不是自动 sanitizer。

如果 nameLiteral["users","orders"],这才是 C_SAFE^schema 加上隐式 allowlist —— 安全。

下一步阅读