Hermes Agent:与你一起成长的AI Agent


引言:什么是 Hermes Agent?

2026年的AI Agent领域,开源项目层出不穷,但有一个项目特别引人注目——来自 Nous Research 的 Hermes Agent。这个项目在GitHub上热度飙升,被开发者称为”自改进Agent”的代表。

那么Hermes Agent到底是什么?按照官方定义:

Hermes Agent 是一个与你一起成长的AI Agent,它具有三层记忆架构,能够do → learn → improve的自改进循环,每15个任务后自动优化自己的技能和行为。

简单来说,这不是一个简单的聊天机器人,而是一个能够持续学习、不断进步的AI助手。它的核心理念是:AI Agent应该像人类一样,通过实践经验来成长

本文将深度解析Hermes Agent的架构设计、技术实现和实战应用,帮助开发者理解这个革命性的AI Agent框架。

1. Hermes Agent 核心特性

在深入架构之前,我们先来了解Hermes Agent的核心特性:

1.1 自改进闭环

Hermes Agent 最大的特点是它的自改进闭环:

  • Do(执行): 完成用户任务
  • Learn(学习): 从任务执行中学习经验
  • Improve(改进): 优化未来的行为模式

这个循环每15个任务自动触发一次,让Agent能够从经验中不断进步。

1.2 Honcho 方言用户建模

Hermes Agent 使用独特的”Honcho方言”来构建深度用户画像:

  • 分析用户的语言习惯
  • 理解用户的偏好模式
  • 记忆用户的特殊需求

这种用户建模让Agent能够提供个性化的服务。

1.3 多后端+多平台支持

目前Hermes Agent支持:

  • 6种终端后端: Claude、OpenAI、Gemini、Ollama、LocalAI、OpenRouter
  • 6种消息平台: Discord、Telegram、Slack、WhatsApp、SMS、Email
  • OpenAI兼容端点: 支持多种OpenAI风格API

这种多平台支持让Hermes Agent能够适应各种应用场景。

2. 三层记忆架构深度解析

Hermes Agent最核心的技术创新是它的三层记忆架构。这个架构的设计哲学是:让Agent能够像人一样思考和记忆

2.1 短期记忆(Short-term Memory)

短期记忆是Agent的”工作内存”,处理当前会话的信息:

功能特性

  • 当前会话上下文: 保持对话的连贯性
  • 临时任务状态: 跟踪正在进行的任务
  • 即时反馈处理: 处理用户的实时反馈

技术实现

1
2
3
4
5
6
7
8
9
10
11
12
13
class ShortTermMemory:
def __init__(self):
self.session_context = {}
self.task_states = {}
self.feedback_queue = []

def add_context(self, key, value):
"""添加上下文信息"""
self.session_context[key] = value

def update_task_state(self, task_id, state):
"""更新任务状态"""
self.task_states[task_id] = state

使用场景

  • 多轮对话中的上下文保持
  • 复杂任务的进度跟踪
  • 实时纠错和调整

2.2 长期记忆(Long-term Memory)

长期记忆是Agent的”经验库”,存储跨会话的重要信息:

功能特性

  • 用户偏好存储: 记住用户的喜好和习惯
  • 成功模式保存: 保存有效的解决方案
  • 知识库积累: 积累领域专业知识

技术实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class LongTermMemory:
def __init__(self):
self.user_preferences = {}
self.success_patterns = {}
self.knowledge_base = {}

def learn_preference(self, user_id, preference, value):
"""学习用户偏好"""
if user_id not in self.user_preferences:
self.user_preferences[user_id] = {}
self.user_preferences[user_id][preference] = value

def add_success_pattern(self, category, pattern, success_rate):
"""添加成功模式"""
if category not in self.success_patterns:
self.success_patterns[category] = []
self.success_patterns[category].append({
'pattern': pattern,
'success_rate': success_rate,
'timestamp': datetime.now()
})

使用场景

  • 个性化推荐系统
  • 问题解决方案优化
  • 知识问答和推理

2.3 经验层(Experience Layer)

经验层是Agent的”智慧中心”,处理学习和决策:

功能特性

  • 经验模式识别: 识别任务执行的模式
  • 策略优化: 优化执行策略
  • 预测决策: 基于历史经验做预测

技术实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class ExperienceLayer:
def __init__(self):
self.pattern_recognizer = PatternRecognizer()
self.strategy_optimizer = StrategyOptimizer()
self.predictor = Predictor()

def analyze_experience(self, task_execution_data):
"""分析执行经验"""
patterns = self.pattern_recognizer.recognize(task_execution_data)
optimized_strategies = self.strategy_optimizer.optimize(patterns)
predictions = self.predictor.predict(optimized_strategies)
return patterns, optimized_strategies, predictions

def improve_behavior(self, analysis_results):
"""改进行为模式"""
for strategy in analysis_results['optimized_strategies']:
self.strategy_optimizer.deploy(strategy)

使用场景

  • 自动化流程优化
  • 智能决策支持
  • 学习能力提升

2.4 三层记忆的协同工作

这三层记忆不是孤立的,而是相互协同工作的:

1
2
3
用户请求 → 短期记忆处理 → 长期记忆支持 → 经验层决策 → 执行结果
↑ ↓
└─────────── 学习反馈 ←─────────────────────────────────┘
  1. 请求处理: 短期记忆处理当前请求
  2. 上下文支持: 长期记忆提供相关历史信息
  3. 智能决策: 经验层基于历史经验做决策
  4. 执行反馈: 执行结果反馈给各层记忆
  5. 持续学习: 从反馈中学习并优化

3. 自改进闭环(Do → Learn → Improve)

Hermes Agent最革命性的设计是它的自改进闭环。这个闭环让Agent能够像人类一样通过经验不断成长。

3.1 Do(执行阶段)

任务执行流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def execute_task(self, task_description):
"""执行任务"""
# 1. 任务解析和理解
parsed_task = self.task_parser.parse(task_description)

# 2. 策略选择
strategy = self.strategy_selector.select(parsed_task)

# 3. 执行任务
result = self.task_executor.execute(strategy)

# 4. 记录执行数据
execution_data = {
'task': parsed_task,
'strategy': strategy,
'result': result,
'timestamp': datetime.now(),
'success': self.evaluate_success(result)
}

# 5. 存储到短期记忆
self.short_term_memory.add_execution_data(execution_data)

return result

执行策略选择

Agent会根据历史执行数据选择最佳策略:

1
2
3
4
5
6
7
8
9
10
11
12
13
def select_strategy(self, task):
"""选择执行策略"""
# 1. 检查是否有相关的历史成功策略
successful_strategies = self.long_term_memory.get_successful_strategies(task)

if successful_strategies:
# 选择成功率最高的策略
best_strategy = max(successful_strategies,
key=lambda s: s['success_rate'])
return best_strategy

# 2. 如果没有历史策略,使用默认策略
return self.default_strategy

3.2 Learn(学习阶段)

学习机制

每15个任务完成后,Agent会触发学习机制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def learn_from_experience(self):
"""从经验中学习"""
# 1. 收集最近15个任务的执行数据
recent_executions = self.short_term_memory.get_recent_executions(15)

# 2. 分析执行模式
patterns = self.experience_layer.analyze_experience(recent_executions)

# 3. 提取成功经验
successful_experiences = [
exec_data for exec_data in recent_executions
if exec_data['success']
]

# 4. 更新长期记忆
for experience in successful_experiences:
self.long_term_memory.add_success_pattern(
experience['task']['category'],
experience['strategy'],
experience['success']
)

# 5. 更新用户偏好
self.update_user_preferences(recent_executions)

# 6. 清理短期记忆
self.short_term_memory.cleanup_old_data()

模式识别算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class PatternRecognizer:
def recognize(self, execution_data):
"""识别执行模式"""
patterns = []

for data in execution_data:
# 1. 任务类型模式
task_pattern = self._recognize_task_pattern(data)
patterns.append(task_pattern)

# 2. 策略效果模式
strategy_pattern = self._recognize_strategy_pattern(data)
patterns.append(strategy_pattern)

# 3. 用户反馈模式
feedback_pattern = self._recognize_feedback_pattern(data)
patterns.append(feedback_pattern)

return patterns

def _recognize_task_pattern(self, data):
"""识别任务类型模式"""
# 分析任务类型与执行效果的关系
task_type = data['task']['category']
success = data['success']

return {
'type': 'task_category',
'category': task_type,
'success_rate': success,
'frequency': len([d for d in execution_data if d['task']['category'] == task_type])
}

3.3 Improve(改进阶段)

行为优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def improve_behavior(self, analysis_results):
"""改进行为模式"""
for pattern in analysis_results['patterns']:
if pattern['type'] == 'strategy':
# 优化策略
self.optimize_strategy(pattern)
elif pattern['type'] == 'response':
# 优化响应模式
self.optimize_response(pattern)
elif pattern['type'] == 'timing':
# 优化执行时机
self.optimize_timing(pattern)

# 更新知识库
self.knowledge_base.update(analysis_results['new_knowledge'])

# 重新构建决策模型
self.rebuild_decision_model()

策略优化算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class StrategyOptimizer:
def optimize(self, patterns):
"""优化执行策略"""
optimized_strategies = []

for pattern in patterns:
if pattern['success_rate'] > 0.8: # 高成功率策略
# 保持并强化
optimized_strategies.append(self._强化策略(pattern))
elif pattern['success_rate'] < 0.3: # 低成功率策略
# 改进或替换
optimized_strategies.append(self._改进策略(pattern))
else:
# 保持现状
optimized_strategies.append(pattern)

return optimized_strategies

def _强化策略(self, strategy):
"""强化高成功率策略"""
return {
**strategy,
'enhanced': True,
'confidence': strategy['success_rate'] * 1.2
}

def _改进策略(self, strategy):
"""改进低成功率策略"""
# 寻找相关的高成功率策略作为参考
similar_strategies = self._find_similar_successful_strategies(strategy)

if similar_strategies:
# 基于成功策略进行改进
improved_strategy = self._基于成功策略改进(strategy, similar_strategies)
return improved_strategy
else:
# 使用全新的策略
return self._generate_new_strategy(strategy)

3.4 自改进循环的触发条件

自改进循环不是机械地每15个任务触发,而是基于智能判断:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def should_improve(self):
"""判断是否应该触发改进循环"""
# 1. 任务数量检查
if len(self.short_term_memory.get_recent_executions(20)) < 15:
return False

# 2. 成功率检查
recent_success_rate = self.calculate_recent_success_rate()
if recent_success_rate > 0.9:
return True # 高成功率也需要持续优化

# 3. 失败率检查
recent_failure_rate = self.calculate_recent_failure_rate()
if recent_failure_rate > 0.5:
return True # 失败率高需要紧急改进

# 4. 用户反馈检查
recent_feedback = self.short_term_memory.get_recent_feedback()
negative_feedback_count = len([f for f in recent_feedback if f['sentiment'] < 0.3])

if negative_feedback_count > 5:
return True # 用户反馈差需要改进

# 5. 时间检查(至少每周一次)
time_since_last_improvement = datetime.now() - self.last_improvement_time
if time_since_last_improvement > timedelta(days=7):
return True

return False

4. 多后端+多平台支持

4.1 终端后端架构

Hermes Agent支持多种终端后端,通过统一的接口抽象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class BackendManager:
def __init__(self):
self.backends = {
'claude': ClaudeBackend(),
'openai': OpenAIBackend(),
'gemini': GeminiBackend(),
'ollama': OllamaBackend(),
'localai': LocalAIBackend(),
'openrouter': OpenRouterBackend()
}

def get_backend(self, name):
"""获取指定的后端"""
if name not in self.backends:
raise ValueError(f"Backend {name} not supported")
return self.backends[name]

def auto_select_backend(self, requirements):
"""根据需求自动选择后端"""
# 1. 检查用户偏好
user_preference = self.user_memory.get_backend_preference()
if user_preference and user_preference in self.backends:
return self.get_backend(user_preference)

# 2. 根据功能需求选择
if requirements.get('cost_sensitive'):
return self.get_backend('ollama') # 开源模型更经济
elif requirements.get('quality_sensitive'):
return self.get_backend('claude') # Claude质量最高
elif requirements.get('real_time_sensitive'):
return self.get_backend('openai') # OpenAI响应最快

# 3. 默认选择
return self.get_backend('claude')

4.2 消息平台适配器

支持多种消息平台的统一接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class MessagePlatformAdapter:
def __init__(self):
self.platforms = {
'discord': DiscordPlatform(),
'telegram': TelegramPlatform(),
'slack': SlackPlatform(),
'whatsapp': WhatsAppPlatform(),
'sms': SMSPlatform(),
'email': EmailPlatform()
}

def send_message(self, platform, message, user_id):
"""发送消息到指定平台"""
if platform not in self.platforms:
raise ValueError(f"Platform {platform} not supported")

platform_adapter = self.platforms[platform]
return platform_adapter.send_message(message, user_id)

def format_message_for_platform(self, message, platform):
"""为不同平台格式化消息"""
platform_adapter = self.platforms[platform]
return platform_adapter.format_message(message)

4.3 OpenAI兼容端点

为了方便集成,Hermes Agent提供了OpenAI兼容的API端点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class OpenAICompatibleEndpoint:
def __init__(self, hermes_agent):
self.agent = hermes_agent

def create_chat_completion(self, messages, **kwargs):
"""创建聊天完成,兼容OpenAI API"""
# 转换消息格式
converted_messages = self._convert_openai_messages(messages)

# 调用Hermes Agent
response = self.agent.process_message(converted_messages)

# 转换为OpenAI格式
return self._convert_to_openai_format(response)

def _convert_openai_messages(self, messages):
"""转换OpenAI消息格式为Hermes格式"""
converted = []
for msg in messages:
converted.append({
'role': msg['role'],
'content': msg['content'],
'timestamp': datetime.now().isoformat()
})
return converted

def _convert_to_openai_format(self, response):
"""转换为OpenAI API响应格式"""
return {
'id': f'chatcmpl-{uuid.uuid4()}',
'object': 'chat.completion',
'created': int(datetime.now().timestamp()),
'model': 'hermes-agent',
'choices': [{
'index': 0,
'message': {
'role': 'assistant',
'content': response['content']
},
'finish_reason': response.get('finish_reason', 'stop')
}],
'usage': response.get('usage', {})
}

5. 实战:从零搭建 Hermes Agent

5.1 环境准备

系统要求

  • Python 3.8+
  • 内存:至少4GB
  • 存储:至少10GB(用于记忆存储)

依赖安装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 创建虚拟环境
python -m venv hermes-env
source hermes-env/bin/activate

# 安装依赖
pip install -r requirements.txt

# 主要依赖包
- anthropic: Claude API集成
- openai: OpenAI API集成
- redis: 缓存和记忆存储
- sqlalchemy: 数据库操作
- pydantic: 数据验证
- fastapi: API服务
- uvicorn: ASGI服务器

5.2 基础配置

配置文件(config.yaml)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# Hermes Agent 配置
agent:
name: "MyHermesAgent"
version: "1.0.0"

# 记忆配置
memory:
short_term:
type: "redis"
redis_url: "redis://localhost:6379/0"
ttl: 3600 # 1小时过期

long_term:
type: "sqlite"
database_path: "./data/long_term.db"

experience:
type: "file"
storage_path: "./data/experience.json"
learning_interval: 15 # 每15个任务学习一次

# 后端配置
backends:
claude:
api_key: "${CLAUDE_API_KEY}"
model: "claude-3-5-sonnet-20241022"
max_tokens: 4000

openai:
api_key: "${OPENAI_API_KEY}"
model: "gpt-4"
max_tokens: 4000

# 平台配置
platforms:
discord:
bot_token: "${DISCORD_BOT_TOKEN}"
prefix: "!"

telegram:
bot_token: "${TELEGRAM_BOT_TOKEN}"

环境变量配置(.env)

1
2
3
4
5
6
7
8
9
10
11
# API Keys
CLAUDE_API_KEY=your_claude_api_key_here
OPENAI_API_KEY=your_openai_api_key_here

# Bot Tokens
DISCORD_BOT_TOKEN=your_discord_bot_token_here
TELEGRAM_BOT_TOKEN=your_telegram_bot_token_here

# Database
REDIS_URL=redis://localhost:6379/0
DATABASE_URL=sqlite:///./data/hermes.db

5.3 核心组件初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# main.py
import os
from hermes_agent.core import HermesAgent
from hermes_agent.memory import ShortTermMemory, LongTermMemory, ExperienceLayer
from hermes_agent.backends import BackendManager
from hermes_agent.platforms import MessagePlatformAdapter

def initialize_agent():
"""初始化Hermes Agent"""
# 1. 初始化记忆系统
short_term_memory = ShortTermMemory()
long_term_memory = LongTermMemory()
experience_layer = ExperienceLayer()

# 2. 初始化后端管理器
backend_manager = BackendManager()

# 3. 初始化消息平台适配器
platform_adapter = MessagePlatformAdapter()

# 4. 创建Agent实例
agent = HermesAgent(
name="MyHermesAgent",
short_term_memory=short_term_memory,
long_term_memory=long_term_memory,
experience_layer=experience_layer,
backend_manager=backend_manager,
platform_adapter=platform_adapter
)

return agent

def main():
"""主函数"""
# 初始化Agent
agent = initialize_agent()

# 启动服务
agent.start()

# 保持运行
try:
while True:
pass
except KeyboardInterrupt:
agent.stop()
print("Agent stopped")

if __name__ == "__main__":
main()

5.4 记忆系统配置

短期记忆配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# memory/short_term_memory.py
from redis import Redis
from datetime import datetime, timedelta

class ShortTermMemory:
def __init__(self, redis_url="redis://localhost:6379/0", ttl=3600):
self.redis = Redis.from_url(redis_url)
self.ttl = ttl

def add_session_context(self, session_id, context):
"""添加会话上下文"""
key = f"session:{session_id}:context"
self.redis.setex(key, self.ttl, json.dumps(context))

def get_session_context(self, session_id):
"""获取会话上下文"""
key = f"session:{session_id}:context"
context = self.redis.get(key)
return json.loads(context) if context else {}

def add_task_state(self, task_id, state):
"""添加任务状态"""
key = f"task:{task_id}:state"
self.redis.setex(key, self.ttl, json.dumps(state))

def get_task_state(self, task_id):
"""获取任务状态"""
key = f"task:{task_id}:state"
state = self.redis.get(key)
return json.loads(state) if state else {}

def add_feedback(self, feedback_data):
"""添加反馈数据"""
feedback_id = str(uuid.uuid4())
key = f"feedback:{feedback_id}"
self.redis.setex(key, self.ttl * 2, json.dumps(feedback_data))
return feedback_id

def get_recent_executions(self, count=15):
"""获取最近的执行记录"""
# 这里简化处理,实际应该从有序集合中获取
keys = self.redis.keys("execution:*")
executions = []

for key in keys[-count:]:
execution = self.redis.get(key)
if execution:
executions.append(json.loads(execution))

return sorted(executions, key=lambda x: x['timestamp'])[-count:]

长期记忆配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# memory/long_term_memory.py
from sqlalchemy import create_engine, Column, String, Integer, Float, DateTime, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime

Base = declarative_base()

class UserPreference(Base):
__tablename__ = 'user_preferences'
id = Column(String, primary_key=True)
user_id = Column(String)
preference_type = Column(String)
preference_value = Column(String)
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)

class SuccessPattern(Base):
__tablename__ = 'success_patterns'
id = Column(String, primary_key=True)
category = Column(String)
pattern = Column(Text)
success_rate = Column(Float)
usage_count = Column(Integer, default=1)
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)

class LongTermMemory:
def __init__(self, database_url="sqlite:///./data/long_term.db"):
self.engine = create_engine(database_url)
Base.metadata.create_all(self.engine)
self.Session = sessionmaker(bind=self.engine)

def learn_preference(self, user_id, preference_type, preference_value):
"""学习用户偏好"""
session = self.Session()

# 查找现有偏好
existing = session.query(UserPreference).filter_by(
user_id=user_id,
preference_type=preference_type
).first()

if existing:
existing.preference_value = preference_value
existing.updated_at = datetime.now()
else:
new_pref = UserPreference(
user_id=user_id,
preference_type=preference_type,
preference_value=preference_value
)
session.add(new_pref)

session.commit()
session.close()

def add_success_pattern(self, category, pattern, success_rate):
"""添加成功模式"""
session = self.Session()

# 查找现有模式
existing = session.query(SuccessPattern).filter_by(
category=category,
pattern=pattern
).first()

if existing:
# 更新成功率(加权平均)
existing.success_rate = (existing.success_rate * existing.usage_count + success_rate) / (existing.usage_count + 1)
existing.usage_count += 1
existing.updated_at = datetime.now()
else:
new_pattern = SuccessPattern(
category=category,
pattern=pattern,
success_rate=success_rate
)
session.add(new_pattern)

session.commit()
session.close()

def get_successful_strategies(self, task):
"""获取任务的成功策略"""
session = self.Session()

strategies = session.query(SuccessPattern).filter_by(
category=task['category']
).order_by(SuccessPattern.success_rate.desc()).limit(5).all()

session.close()

return [
{
'strategy': strategy.pattern,
'success_rate': strategy.success_rate,
'usage_count': strategy.usage_count
}
for strategy in strategies
]

经验层配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# memory/experience_layer.py
import json
from datetime import datetime, timedelta

class ExperienceLayer:
def __init__(self, storage_path="./data/experience.json"):
self.storage_path = storage_path
self.experience_data = self._load_experience_data()

def _load_experience_data(self):
"""加载经验数据"""
try:
with open(self.storage_path, 'r', encoding='utf-8') as f:
return json.load(f)
except FileNotFoundError:
return {
'patterns': [],
'strategies': [],
'knowledge_base': {},
'last_improvement': datetime.now().isoformat()
}

def _save_experience_data(self):
"""保存经验数据"""
with open(self.storage_path, 'w', encoding='utf-8') as f:
json.dump(self.experience_data, f, ensure_ascii=False, indent=2)

def analyze_experience(self, execution_data):
"""分析执行经验"""
patterns = self._recognize_patterns(execution_data)
strategies = self._analyze_strategies(execution_data)
new_knowledge = self._extract_knowledge(execution_data)

# 更新经验数据
self.experience_data['patterns'].extend(patterns)
self.experience_data['strategies'].extend(strategies)

# 更新知识库
for category, knowledge in new_knowledge.items():
if category not in self.experience_data['knowledge_base']:
self.experience_data['knowledge_base'][category] = []
self.experience_data['knowledge_base'][category].extend(knowledge)

self._save_experience_data()

return {
'patterns': patterns,
'strategies': strategies,
'new_knowledge': new_knowledge
}

def _recognize_patterns(self, execution_data):
"""识别模式"""
patterns = []

for data in execution_data:
# 1. 时间模式
time_pattern = self._analyze_time_patterns(data)
if time_pattern:
patterns.append(time_pattern)

# 2. 任务复杂度模式
complexity_pattern = self._analyze_complexity_patterns(data)
if complexity_pattern:
patterns.append(complexity_pattern)

# 3. 用户反馈模式
feedback_pattern = self._analyze_feedback_patterns(data)
if feedback_pattern:
patterns.append(feedback_pattern)

return patterns

def _analyze_time_patterns(self, data):
"""分析时间模式"""
execution_time = data.get('execution_time', 0)
success = data.get('success', False)
task_category = data.get('task', {}).get('category', 'unknown')

if execution_time > 300: # 超过5分钟
return {
'type': 'time_pattern',
'category': f'slow_{task_category}',
'characteristic': 'slow_execution',
'success': success,
'avg_time': execution_time
}

return None

def _analyze_complexity_patterns(self, data):
"""分析复杂度模式"""
task_complexity = data.get('task', {}).get('complexity', 'medium')
success = data.get('success', False)

return {
'type': 'complexity_pattern',
'complexity': task_complexity,
'success': success,
'success_rate_by_complexity': self._calculate_complexity_success_rate(task_complexity)
}

def _analyze_feedback_patterns(self, data):
"""分析反馈模式"""
feedback = data.get('feedback', {})
if feedback:
return {
'type': 'feedback_pattern',
'sentiment': feedback.get('sentiment', 0),
'satisfaction': feedback.get('satisfaction', 0),
'improvement_suggestions': feedback.get('suggestions', [])
}
return None

def improve_behavior(self, analysis_results):
"""改进行为"""
for pattern in analysis_results['patterns']:
if pattern['type'] == 'time_pattern' and not pattern['success']:
# 针对慢任务优化
self._optimize_slow_tasks(pattern)
elif pattern['type'] == 'complexity_pattern':
# 针对复杂度优化
self._optimize_complexity_handling(pattern)
elif pattern['type'] == 'feedback_pattern':
# 根据反馈优化
self._optimize_based_on_feedback(pattern)

# 更新最后改进时间
self.experience_data['last_improvement'] = datetime.now().isoformat()
self._save_experience_data()

def _optimize_slow_tasks(self, pattern):
"""优化慢任务处理"""
category = pattern['category']
print(f"优化慢任务类别: {category}")

# 这里可以实现具体的优化策略
# 比如添加缓存、并行处理等

# 更新经验数据
if 'optimizations' not in self.experience_data:
self.experience_data['optimizations'] = {}

if category not in self.experience_data['optimizations']:
self.experience_data['optimizations'][category] = []

self.experience_data['optimizations'][category].append({
'type': 'performance_optimization',
'timestamp': datetime.now().isoformat(),
'pattern': pattern
})

def _optimize_complexity_handling(self, pattern):
"""优化复杂度处理"""
complexity = pattern['complexity']
success_rate = pattern['success_rate_by_complexity']

print(f"优化复杂度处理: {complexity}, 当前成功率: {success_rate}")

# 根据复杂度调整策略
if success_rate < 0.5: # 低成功率
# 简化任务分解
self._simplify_task_decomposition(complexity)
elif success_rate > 0.8: # 高成功率
# 增加复杂度处理能力
self._enhance_complexity_handling(complexity)

def _optimize_based_on_feedback(self, pattern):
"""根据反馈优化"""
sentiment = pattern['sentiment']
satisfaction = pattern['satisfaction']

print(f"根据用户反馈优化,情感倾向: {sentiment}, 满意度: {satisfaction}")

if sentiment < 0.3: # 负面反馈
# 需要紧急改进
self._handle_negative_feedback(pattern)
elif satisfaction < 0.6: # 中等满意度
# 持续改进
self._handle_moderate_feedback(pattern)

5.5 后端管理器配置

# backends/backend_manager.py
from hermes_agent.backends.claude import ClaudeBackend
from hermes_agent.backends.openai import OpenAIBackend
from hermes_agent.backends.gemini import GeminiBackend
from hermes_agent.backends.ollama import OllamaBackend
from hermes_agent.backends.localai import LocalAIBackend
from hermes_agent.backends.openrouter import OpenRouterBackend

class BackendManager:
    def __init__(self, config):
        self.config = config
        self.backends = {}
        self._initialize_backends()
    
    def _initialize_backends(self):
        """初始化所有后端"""
        # Claude
        if 'claude' in self.config:
            self.backends['claude'] = ClaudeBackend(self.config['claude'])
        
        # OpenAI
        if 'openai' in self.config:
            self.backends['openai'] = OpenAIBackend(self.config['openai'])
        
        # Gemini
        if 'gemini' in self.config:
            self.backends['gemini'] = GeminiBackend(self.config['gemini'])
        
        # Ollama
        if 'ollama' in self.config:
            self.backends['ollama'] = OllamaBackend(self.config['ollama'])
        
        # LocalAI
        if 'localai' in self.config:
            self.backends['localai'] = LocalAIBackend(self.config['localai'])
        
        # OpenRouter
        if 'openrouter' in self.config:
            self.backends['openrouter'] = OpenRouterBackend(self.config['openrouter'])
    
    def get_backend(self, name):
        """获取指定的后端"""
        if name not in self.backends:
            raise ValueError(f"Backend {name} not supported")
        return self.backends[name]
    
    def auto_select_backend(self, requirements):
        """根据需求自动选择后端"""
        # 1. 检查用户偏好
        user_preference = self.get_user_preference()
        if user_preference and user_preference in self.backends:
            return self.get_backend(user_preference)
        
        # 2. 根据需求选择
        if requirements.get('cost_sensitive'):
            # 成本敏感:选择开源模型
            return self._select_cost_effective_backend()
        elif requirements.get('quality_sensitive'):
            # 质量敏感:选择最佳质量模型
            return self._select_high_quality_backend()
        elif requirements.get('real_time_sensitive'):
            # 实时敏感:选择响应最快的模型
            return self._select_fast_backend()
        elif requirements.get('privacy_sensitive'):
            # 隐私敏感:选择本地模型
            return self._select_private_backend()
        
        # 3. 默认选择(平衡质量和成本)
        return self._select_balanced_backend()
    
    def _select_cost_effective_backend(self):
        """选择成本效益高的后端"""
        # 优先选择开源模型
        priority_order = ['ollama', 'localai', 'openrouter', 'openai', 'claude', 'gemini']
        
        for backend_name in priority_order:
            if backend_name in self.backends:
                return self.get_backend(backend_name)
        
        raise ValueError("No cost-effective backend available")
    
    def _select_high_quality_backend(self):
        """选择高质量后端"""
        # Claude和OpenAI通常质量最高
        priority_order = ['claude', 'openai', 'gemini', 'openrouter', 'localai', 'ollama']
        
        for backend_name in priority_order:
            if backend_name in self.backends:
                return self.get_backend(backend_name)
        
        raise ValueError("No high-quality backend available")
    
    def _select_fast_backend(self):
        """选择快速响应的后端"""
        # OpenAI通常响应最快
        priority_order = ['openai', 'claude', 'gemini', 'openrouter', 'ollama', 'localai']
        
        for backend_name in priority_order:
            if backend_name in self.backends:
                return self.get_backend(backend_name)
        
        raise ValueError("No fast backend available")
    
    def _select_private_backend(self):
        """选择私有后端(本地运行)"""
        # 本地模型最安全
        priority_order = ['localai', 'ollama', 'openrouter', 'openai', 'claude', 'gemini']
        
        for backend_name in priority_order:
            if backend_name in self.backends:
                return self.get_backend(backend_name)
        
        raise ValueError("No private backend available")
    
    def _select_bal