打造你的AI个人助理:全栈指南与实战教程
1. AI个人助理架构全景
1.1 核心模块设计
graph TB
A[AI个人助理系统] --> B[输入层]
A --> C[处理层]
A --> D[输出层]
A --> E[存储层]
B --> B1[语音输入]
B --> B2[文本输入]
B --> B3[API调用]
B --> B4[文件上传]
C --> C1[NLP理解]
C --> C2[任务分类]
C --> C3[智能调度]
C --> C4[记忆管理]
D --> D1[语音输出]
D --> D2[文本回复]
D --> D3[执行动作]
D --> D4[文件生成]
E --> E1[向量数据库]
E --> E2[关系数据库]
E --> E3[文件存储]
E --> E4[缓存系统]
1.2 技术栈选择矩阵
| 功能模块 |
推荐方案 |
开源替代 |
成本考量 |
| 核心AI引擎 |
GPT-4 API + Claude API |
Llama 3 + Ollama |
按使用量 |
| 语音识别 |
Whisper API |
Whisper.cpp |
免费 |
| 语音合成 |
ElevenLabs |
Coqui TTS |
按字符 |
| 任务自动化 |
Zapier/Make + APIs |
n8n + Python |
免费-付费 |
| 本地部署 |
Docker + FastAPI |
纯Python脚本 |
硬件成本 |
| 知识库 |
Pinecone/Weaviate |
ChromaDB |
免费-付费 |
| 界面 |
Streamlit/Gradio |
微信机器人 |
免费 |
2. 基础版:微信AI助理(24小时部署)
2.1 基于ChatGPT的微信机器人
"""
基于itchat和OpenAI的微信个人助理
24小时内可部署完成的基础版本
"""
import itchat
import openai
import json
import os
from datetime import datetime
from threading import Thread
import schedule
import time
class WeChatAIAssistant:
def __init__(self, config_path="config.json"):
"""初始化AI助理"""
with open(config_path, 'r', encoding='utf-8') as f:
self.config = json.load(f)
openai.api_key = self.config['openai_api_key']
self.conversation_history = {}
self.user_profiles = {}
self.skills = self._load_skills()
self.scheduled_tasks = []
print("🤖 AI个人助理初始化完成")
def _load_skills(self):
"""加载预设技能"""
skills = {
"日程管理": self.handle_schedule,
"天气查询": self.handle_weather,
"新闻摘要": self.handle_news,
"翻译服务": self.handle_translation,
"计算助手": self.handle_calculation,
"知识问答": self.handle_qa,
"备忘录": self.handle_note,
"文件处理": self.handle_file
}
return skills
def classify_intent(self, message):
"""智能意图分类"""
prompt = f"""
请分析用户的意图,从以下选项中选择:
1. 日程管理(安排、提醒、查看日程)
2. 天气查询(天气、温度、预报)
3. 新闻摘要(新闻、头条、热点)
4. 翻译服务(翻译、中英文转换)
5. 计算助手(计算、换算、单位转换)
6. 知识问答(问题、查询、解释)
7. 备忘录(记录、记住、备忘)
8. 文件处理(处理、分析、总结文件)
9. 闲聊(其他对话)
用户消息:{message}
返回格式:{{"intent": "意图类型", "confidence": 置信度0-1}}
"""
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
temperature=0.3
)
result = json.loads(response.choices[0].message.content)
return result
except:
return {"intent": "闲聊", "confidence": 0.5}
def handle_message(self, message, user_id):
"""处理用户消息"""
if user_id not in self.conversation_history:
self.conversation_history[user_id] = []
self.conversation_history[user_id].append({
"role": "user",
"content": message,
"time": datetime.now().isoformat()
})
if len(self.conversation_history[user_id]) > 10:
self.conversation_history[user_id] = self.conversation_history[user_id][-10:]
intent_result = self.classify_intent(message)
if intent_result["confidence"] > 0.8:
intent = intent_result["intent"]
if intent in self.skills:
return self.skills[intent](message, user_id)
return self.generate_gpt_response(message, user_id)
def generate_gpt_response(self, message, user_id):
"""生成GPT回复"""
context = [
{"role": "system", "content": self.config['system_prompt']}
]
if user_id in self.conversation_history:
for msg in self.conversation_history[user_id][-5:]:
context.append({
"role": msg["role"],
"content": msg["content"]
})
context.append({"role": "user", "content": message})
try:
response = openai.ChatCompletion.create(
model=self.config.get('model', 'gpt-3.5-turbo'),
messages=context,
temperature=0.7,
max_tokens=1000
)
reply = response.choices[0].message.content
self.conversation_history[user_id].append({
"role": "assistant",
"content": reply,
"time": datetime.now().isoformat()
})
return reply
except Exception as e:
return f"抱歉,我暂时无法处理您的请求。错误:{str(e)}"
def handle_schedule(self, message, user_id):
"""处理日程管理"""
prompt = f"""
从以下消息中提取日程信息:
消息:{message}
请提取:
1. 事件内容
2. 日期和时间(如未指定,默认为今天)
3. 提醒时间(提前多少分钟)
返回JSON格式。
"""
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
temperature=0.3
)
schedule_info = json.loads(response.choices[0].message.content)
if user_id not in self.user_profiles:
self.user_profiles[user_id] = {"schedules": []}
self.user_profiles[user_id]["schedules"].append({
"event": schedule_info.get("event"),
"datetime": schedule_info.get("datetime"),
"reminder": schedule_info.get("reminder"),
"created": datetime.now().isoformat()
})
reminder_time = schedule_info.get("reminder", "10分钟前")
self.scheduled_tasks.append({
"user_id": user_id,
"event": schedule_info.get("event"),
"time": reminder_time
})
return f"✅ 已为您安排日程:{schedule_info.get('event')},时间:{schedule_info.get('datetime')},我会提前{reminder_time}提醒您。"
except Exception as e:
return f"安排日程时出错:{str(e)},请重新描述。"
def handle_weather(self, message, user_id):
"""处理天气查询"""
prompt = f"""
从以下消息中提取地点信息:
消息:{message}
返回格式:{{"location": "地点"}}
"""
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
temperature=0.3
)
location_info = json.loads(response.choices[0].message.content)
location = location_info.get("location", "北京")
weather_data = {
"location": location,
"temperature": "25°C",
"condition": "晴天",
"humidity": "65%",
"wind": "3级"
}
return f"🌤️ {location}的天气:\n温度:{weather_data['temperature']}\n天气:{weather_data['condition']}\n湿度:{weather_data['humidity']}\n风力:{weather_data['wind']}"
except Exception as e:
return f"查询天气时出错:{str(e)}"
def handle_translation(self, message, user_id):
"""处理翻译"""
prompt = f"""
请翻译以下内容,识别源语言并翻译为目标语言:
内容:{message}
返回格式:{{"original": "原文", "translation": "翻译", "source_lang": "源语言", "target_lang": "目标语言"}}
"""
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
temperature=0.3
)
translation = json.loads(response.choices[0].message.content)
return f"🌐 翻译结果:\n原文({translation['source_lang']}):{translation['original']}\n翻译({translation['target_lang']}):{translation['translation']}"
except Exception as e:
return f"翻译时出错:{str(e)}"
def run_scheduler(self):
"""运行计划任务"""
while True:
now = datetime.now()
for task in self.scheduled_tasks[:]:
print(f"⏰ 提醒:{task['event']}")
self.scheduled_tasks.remove(task)
time.sleep(60)
def start(self):
"""启动微信机器人"""
itchat.auto_login(hotReload=True, enableCmdQR=2)
scheduler_thread = Thread(target=self.run_scheduler)
scheduler_thread.daemon = True
scheduler_thread.start()
@itchat.msg_register(itchat.content.TEXT)
def text_reply(msg):
user_id = msg['FromUserName']
response = self.handle_message(msg['Text'], user_id)
return response
print("🚀 AI个人助理已启动,等待消息...")
itchat.run()
"""
{
"openai_api_key": "your-api-key-here",
"model": "gpt-3.5-turbo",
"system_prompt": "你是一个专业的个人助理,可以帮助用户管理日程、查询信息、处理文件等。请以友好、专业的态度回应用户。",
"skills": ["schedule", "weather", "translation", "news", "calculation"],
"weather_api_key": "",
"news_api_key": ""
}
"""
if __name__ == "__main__":
assistant = WeChatAIAssistant("config.json")
assistant.start()
2.2 部署与配置指南
# 24小时部署指南
## 第1-4小时:环境准备
1. 注册OpenAI API账号,获取API Key
2. 安装Python 3.8+和pip
3. 安装依赖包:
```bash
pip install itchat openai schedule requests python-dotenv
第5-8小时:基础功能开发
- 复制上面的代码到wechat_ai_assistant.py
- 创建config.json配置文件
- 测试基本对话功能
第9-12小时:技能扩展
- 添加更多技能处理函数
- 集成外部API(天气、新闻等)
- 实现文件处理功能
第13-16小时:优化与测试
- 添加错误处理
- 优化用户体验
- 测试各种场景
第17-20小时:部署上线
- 购买云服务器(腾讯云/阿里云)
- 配置环境,设置自启动
- 扫描二维码登录微信
第21-24小时:监控与维护
- 添加日志系统
- 设置API使用监控
- 准备应急方案
## 3. 进阶版:全功能桌面AI助理
### 3.1 基于Streamlit的Web界面
```python
# advanced_assistant.py
"""
全功能桌面AI助理,支持语音、文件、日程管理等
"""
import streamlit as st
import openai
import whisper
import pyttsx3
import speech_recognition as sr
from datetime import datetime, timedelta
import pandas as pd
import json
import os
import uuid
from pathlib import Path
import pickle
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQA
from langchain.memory import ConversationBufferMemory
import schedule
import threading
import time
class AdvancedAIAssistant:
def __init__(self):
"""初始化高级AI助理"""
st.set_page_config(
page_title="AI个人助理",
page_icon="🤖",
layout="wide"
)
# 初始化会话状态
if 'assistant_initialized' not in st.session_state:
self._initialize_session_state()
# 加载配置
self._load_config()
# 初始化组件
self._initialize_components()
def _initialize_session_state(self):
"""初始化会话状态"""
st.session_state.assistant_initialized = True
st.session_state.conversation_history = []
st.session_state.memory_context = []
st.session_state.scheduled_tasks = []
st.session_state.documents = []
st.session_state.knowledge_base = None
st.session_state.user_profile = {}
st.session_state.api_usage = {"tokens": 0, "cost": 0.0}
def _load_config(self):
"""加载配置"""
# 从环境变量或配置文件加载
self.openai_api_key = st.secrets.get("OPENAI_API_KEY", "")
openai.api_key = self.openai_api_key
# 其他配置
self.config = {
"model": "gpt-4",
"temperature": 0.7,
"max_tokens": 2000,
"embedding_model": "text-embedding-ada-002"
}
def _initialize_components(self):
"""初始化各个组件"""
# 初始化语音识别(如果配置了)
try:
self.recognizer = sr.Recognizer()
self.microphone = sr.Microphone()
except:
st.warning("麦克风未检测到,语音功能受限")
# 初始化知识库
self._initialize_knowledge_base()
def _initialize_knowledge_base(self):
"""初始化向量知识库"""
persist_directory = "./knowledge_base"
if os.path.exists(persist_directory):
try:
embeddings = OpenAIEmbeddings(openai_api_key=self.openai_api_key)
st.session_state.knowledge_base = Chroma(
persist_directory=persist_directory,
embedding_function=embeddings
)
st.success("✅ 知识库加载成功")
except Exception as e:
st.error(f"知识库加载失败: {str(e)}")
def build_sidebar(self):
"""构建侧边栏"""
with st.sidebar:
st.title("🤖 AI个人助理")
# 模型选择
model_option = st.selectbox(
"选择AI模型",
["gpt-3.5-turbo", "gpt-4", "claude-3-sonnet"],
index=0
)
self.config["model"] = model_option
# 温度调节
temperature = st.slider(
"创意度 (Temperature)",
min_value=0.0,
max_value=1.0,
value=0.7,
step=0.1
)
self.config["temperature"] = temperature
# 功能开关
st.subheader("功能设置")
col1, col2 = st.columns(2)
with col1:
voice_enabled = st.checkbox("语音功能", value=True)
file_processing = st.checkbox("文件处理", value=True)
with col2:
schedule_enabled = st.checkbox("日程管理", value=True)
memory_enabled = st.checkbox("长期记忆", value=True)
# API使用情况
st.subheader("API使用情况")
st.metric("Tokens使用", f"{st.session_state.api_usage['tokens']:,}")
st.metric("估算成本", f"${st.session_state.api_usage['cost']:.4f}")
# 快捷操作
st.subheader("快捷操作")
if st.button("🔄 清空对话"):
st.session_state.conversation_history = []
st.rerun()
if st.button("💾 备份数据"):
self._backup_data()
def build_main_interface(self):
"""构建主界面"""
# 标题区域
st.title("✨ 我的AI个人助理")
# 创建标签页
tab1, tab2, tab3, tab4, tab5 = st.tabs([
"💬 对话", "📅 日程", "📚 知识库", "📁 文件", "⚙️ 设置"
])
with tab1:
self._build_chat_interface()
with tab2:
self._build_schedule_interface()
with tab3:
self._build_knowledge_base_interface()
with tab4:
self._build_file_interface()
with tab5:
self._build_settings_interface()
def _build_chat_interface(self):
"""构建聊天界面"""
# 语音输入
col1, col2 = st.columns([4, 1])
with col1:
user_input = st.text_area(
"输入消息",
placeholder="输入您的问题或指令...",
height=100,
key="user_input"
)
with col2:
st.write("语音输入")
if st.button("🎤 开始录音", use_container_width=True):
voice_text = self._voice_to_text()
if voice_text:
st.session_state.user_input = voice_text
st.rerun()
if st.button("🔊 播放回复", use_container_width=True):
self._text_to_voice(st.session_state.conversation_history[-1]["content"]
if st.session_state.conversation_history else "")
# 发送按钮
if st.button("🚀 发送", type="primary", use_container_width=True):
if user_input:
self._process_user_input(user_input)
st.rerun()
# 显示对话历史
st.divider()
st.subheader("对话历史")
for i, msg in enumerate(reversed(st.session_state.conversation_history[-20:])):
with st.chat_message(msg["role"]):
st.write(msg["content"])
st.caption(msg.get("time", ""))
# 添加操作按钮
col1, col2, col3 = st.columns([1, 1, 8])
with col1:
if st.button("📋", key=f"copy_{i}"):
st.code(msg["content"])
with col2:
if st.button("🗑️", key=f"delete_{i}"):
st.session_state.conversation_history.pop(-i-1)
st.rerun()
def _build_schedule_interface(self):
"""构建日程管理界面"""
st.header("📅 日程管理")
col1, col2 = st.columns([2, 1])
with col1:
# 添加新日程
st.subheader("添加新日程")
with st.form("schedule_form"):
event = st.text_input("事件内容")
date = st.date_input("日期")
time = st.time_input("时间")
reminder = st.selectbox(
"提醒时间",
["不提醒", "5分钟前", "10分钟前", "30分钟前", "1小时前"]
)
if st.form_submit_button("添加日程"):
new_task = {
"id": str(uuid.uuid4()),
"event": event,
"date": date.isoformat(),
"time": time.isoformat(),
"reminder": reminder,
"created": datetime.now().isoformat(),
"status": "pending"
}
st.session_state.scheduled_tasks.append(new_task)
st.success("日程添加成功!")
st.rerun()
with col2:
# 今日日程
st.subheader("今日日程")
today = datetime.now().date()
todays_tasks = [
t for t in st.session_state.scheduled_tasks
if datetime.fromisoformat(t["date"]).date() == today
]
for task in todays_tasks:
with st.expander(f"{task['time'][:5]} - {task['event']}"):
st.write(f"提醒: {task['reminder']}")
if st.button("标记完成", key=f"complete_{task['id']}"):
task["status"] = "completed"
st.rerun()
# 所有日程列表
st.subheader("所有日程")
if st.session_state.scheduled_tasks:
df = pd.DataFrame(st.session_state.scheduled_tasks)
st.dataframe(
df[["date", "time", "event", "status"]],
use_container_width=True
)
else:
st.info("暂无日程安排")
def _build_knowledge_base_interface(self):
"""构建知识库界面"""
st.header("📚 个人知识库")
# 上传文档
uploaded_file = st.file_uploader(
"上传文档到知识库",
type=["txt", "pdf", "docx", "md", "csv"],
key="kb_upload"
)
if uploaded_file is not None:
# 保存文件
file_path = f"./uploads/{uploaded_file.name}"
os.makedirs("./uploads", exist_ok=True)
with open(file_path, "wb") as f:
f.write(uploaded_file.getbuffer())
# 处理文档
if st.button("处理并添加到知识库"):
with st.spinner("正在处理文档..."):
self._process_document(file_path)
st.success("文档已添加到知识库!")
# 知识库查询
st.subheader("知识库查询")
kb_query = st.text_input("查询知识库")
if st.button("搜索") and kb_query:
results = self._query_knowledge_base(kb_query)
for result in results:
with st.expander(f"相关度: {result['score']:.2f}"):
st.write(result["content"])
def _build_file_interface(self):
"""构建文件处理界面"""
st.header("📁 文件处理中心")
# 文件上传
uploaded_files = st.file_uploader(
"上传文件进行处理",
type=["txt", "pdf", "docx", "xlsx", "csv", "jpg", "png"],
accept_multiple_files=True,
key="file_upload"
)
if uploaded_files:
for uploaded_file in uploaded_files:
col1, col2, col3 = st.columns([3, 1, 1])
with col1:
st.write(f"**{uploaded_file.name}** ({uploaded_file.size/1024:.1f} KB)")
with col2:
if st.button("分析", key=f"analyze_{uploaded_file.name}"):
self._analyze_file(uploaded_file)
with col3:
if st.button("总结", key=f"summarize_{uploaded_file.name}"):
self._summarize_file(uploaded_file)
# 最近处理的文件
st.subheader("最近处理的文件")
if st.session_state.documents:
for doc in st.session_state.documents[-5:]:
st.write(f"- {doc['name']} ({doc['type']})")
def _build_settings_interface(self):
"""构建设置界面"""
st.header("⚙️ 系统设置")
# API设置
with st.expander("API设置", expanded=True):
new_api_key = st.text_input(
"OpenAI API Key",
value=self.openai_api_key,
type="password"
)
if new_api_key != self.openai_api_key:
self.openai_api_key = new_api_key
openai.api_key = new_api_key
st.success("API Key已更新")
# 助理人格设置
with st.expander("助理人格设置"):
personality = st.selectbox(
"选择助理人格",
["专业助手", "创意伙伴", "严谨顾问", "幽默朋友"],
index=0
)
custom_prompt = st.text_area(
"自定义系统提示词",
value="你是一个有帮助的AI助理。",
height=150
)
if st.button("保存人格设置"):
st.session_state.system_prompt = custom_prompt
st.success("人格设置已保存")
# 数据管理
with st.expander("数据管理"):
if st.button("导出所有数据"):
self._export_data()
if st.button("导入数据"):
uploaded_data = st.file_uploader("选择数据文件", type=["json"])
if uploaded_data:
self._import_data(uploaded_data)
# 系统信息
with st.expander("系统信息"):
st.write(f"Python版本: {os.sys.version}")
st.write(f"当前时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
st.write(f"对话数量: {len(st.session_state.conversation_history)}")
st.write(f"知识库文档: {len(st.session_state.documents)}")
def _process_user_input(self, user_input):
"""处理用户输入"""
# 保存用户消息
st.session_state.conversation_history.append({
"role": "user",
"content": user_input,
"time": datetime.now().strftime("%H:%M:%S")
})
# 构建上下文
context_messages = self._build_context_messages(user_input)
# 获取AI回复
with st.spinner("思考中..."):
response = self._get_ai_response(context_messages)
# 保存AI回复
st.session_state.conversation_history.append({
"role": "assistant",
"content": response,
"time": datetime.now().strftime("%H:%M:%S")
})
# 更新API使用统计
self._update_api_usage(user_input, response)
def _build_context_messages(self, user_input):
"""构建上下文消息"""
messages = [
{"role": "system", "content": st.session_state.get(
"system_prompt",
"你是一个有帮助的AI助理。请以友好、专业的方式回应用户。"
)}
]
# 添加上下文记忆(最近5轮)
recent_history = st.session_state.conversation_history[-10:]
for msg in recent_history:
messages.append({"role": msg["role"], "content": msg["content"]})
# 添加知识库相关内容
if st.session_state.knowledge_base and len(user_input) > 5:
relevant_info = self._query_knowledge_base(user_input, limit=2)
if relevant_info:
kb_context = "\n".join([r["content"] for r in relevant_info])
messages.append({
"role": "system",
"content": f"相关背景信息:{kb_context}"
})
return messages
def _get_ai_response(self, messages):
"""获取AI回复"""
try:
response = openai.ChatCompletion.create(
model=self.config["model"],
messages=messages,
temperature=self.config["temperature"],
max_tokens=self.config["max_tokens"]
)
return response.choices[0].message.content
except Exception as e:
return f"抱歉,处理请求时出错:{str(e)}"
def _voice_to_text(self):
"""语音转文字"""
try:
with self.microphone as source:
st.info("正在录音...请说话")
audio = self.recognizer.listen(source, timeout=5)
text = self.recognizer.recognize_google(audio, language='zh-CN')
return text
except sr.WaitTimeoutError:
st.warning("录音超时")
return None
except Exception as e:
st.error(f"语音识别错误: {str(e)}")
return None
def _text_to_voice(self, text):
"""文字转语音"""
try:
engine = pyttsx3.init()
engine.say(text)
engine.runAndWait()
except Exception as e:
st.error(f"语音合成错误: {str(e)}")
def _process_document(self, file_path):
"""处理文档到知识库"""
try:
# 读取文档内容
if file_path.endswith('.txt'):
with open(file_path, 'r', encoding='utf-8') as f:
text = f.read()
elif file_path.endswith('.pdf'):
import PyPDF2
with open(file_path, 'rb') as f:
pdf_reader = PyPDF2.PdfReader(f)
text = ""
for page in pdf_reader.pages:
text += page.extract_text()
else:
text = f"文件内容: {file_path}"
# 分割文本
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
chunks = text_splitter.split_text(text)
# 创建或更新向量存储
embeddings = OpenAIEmbeddings(openai_api_key=self.openai_api_key)
if st.session_state.knowledge_base is None:
st.session_state.knowledge_base = Chroma.from_texts(
chunks,
embeddings,
persist_directory="./knowledge_base"
)
else:
st.session_state.knowledge_base.add_texts(chunks)
# 保存文档信息
st.session_state.documents.append({
"name": os.path.basename(file_path),
"path": file_path,
"type": os.path.splitext(file_path)[1],
"chunks": len(chunks),
"added": datetime.now().isoformat()
})
# 持久化
st.session_state.knowledge_base.persist()
except Exception as e:
st.error(f"文档处理失败: {str(e)}")
def _query_knowledge_base(self, query, limit=3):
"""查询知识库"""
if st.session_state.knowledge_base is None:
return []
try:
results = st.session_state.knowledge_base.similarity_search_with_score(
query, k=limit
)
return [
{"content": doc.page_content, "score": score}
for doc, score in results
]
except Exception as e:
st.error(f"知识库查询失败: {str(e)}")
return []
def _analyze_file(self, file):
"""分析文件"""
try:
if file.name.endswith('.csv') or file.name.endswith('.xlsx'):
# 分析数据文件
if file.name.endswith('.csv'):
df = pd.read_csv(file)
else:
df = pd.read_excel(file)
analysis = f"""
📊 文件分析报告:
**基本信息**
- 文件名:{file.name}
- 文件大小:{file.size/1024:.1f} KB
- 数据维度:{df.shape[0]} 行 × {df.shape[1]} 列
**数据概览**
- 数值列:{len(df.select_dtypes(include=['number']).columns)} 个
- 文本列:{len(df.select_dtypes(include=['object']).columns)} 个
- 缺失值:{df.isnull().sum().sum()} 个
**统计摘要**
{df.describe().to_string()}
"""
st.info(analysis)
# 显示预览
st.subheader("数据预览")
st.dataframe(df.head(), use_container_width=True)
except Exception as e:
st.error(f"文件分析失败: {str(e)}")
def _summarize_file(self, file):
"""总结文件内容"""
try:
# 读取文件内容
content = file.getvalue().decode('utf-8')[:5000] # 限制长度
prompt = f"""
请总结以下文件内容:
文件名:{file.name}
内容:
{content}
请提供:
1. 主要内容摘要
2. 关键要点(3-5个)
3. 可能的用途
"""
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
max_tokens=500
)
summary = response.choices[0].message.content
st.success("文件总结完成!")
st.write(summary)
except Exception as e:
st.error(f"文件总结失败: {str(e)}")
def _update_api_usage(self, input_text, output_text):
"""更新API使用统计"""
# 简化的token估算
input_tokens = len(input_text) // 4
output_tokens = len(output_text) // 4
st.session_state.api_usage["tokens"] += input_tokens + output_tokens
# 简化的成本估算(按gpt-3.5-turbo价格)
cost_per_token = 0.002 / 1000 # 每千token $0.002
cost = (input_tokens + output_tokens) * cost_per_token
st.session_state.api_usage["cost"] += cost
def _backup_data(self):
"""备份数据"""
backup_data = {
"conversation_history": st.session_state.conversation_history,
"scheduled_tasks": st.session_state.scheduled_tasks,
"documents": st.session_state.documents,
"user_profile": st.session_state.user_profile,
"backup_time": datetime.now().isoformat()
}
backup_file = f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(backup_file, 'w', encoding='utf-8') as f:
json.dump(backup_data, f, ensure_ascii=False, indent=2)
st.success(f"数据已备份到 {backup_file}")
def _export_data(self):
"""导出数据"""
self._backup_data()
def _import_data(self, file):
"""导入数据"""
try:
data = json.load(file)
st.session_state.conversation_history = data.get("conversation_history", [])
st.session_state.scheduled_tasks = data.get("scheduled_tasks", [])
st.session_state.documents = data.get("documents", [])
st.session_state.user_profile = data.get("user_profile", {})
st.success("数据导入成功!")
st.rerun()
except Exception as e:
st.error(f"数据导入失败: {str(e)}")
def run_scheduler(self):
"""运行计划任务"""
def check_tasks():
now = datetime.now()
for task in st.session_state.scheduled_tasks:
if task["status"] != "pending":
continue
task_time = datetime.fromisoformat(f"{task['date']} {task['time']}")
# 检查是否需要提醒
reminder_minutes = {
"不提醒": 0,
"5分钟前": 5,
"10分钟前": 10,
"30分钟前": 30,
"1小时前": 60
}.get(task["reminder"], 0)
reminder_time = task_time - timedelta(minutes=reminder_minutes)
if now >= reminder_time and now < task_time:
# 发送提醒
reminder_msg = f"⏰ 提醒:{task['event']} 将在{reminder_minutes}分钟后开始"
st.session_state.conversation_history.append({
"role": "system",
"content": reminder_msg,
"time": now.strftime("%H:%M:%S")
})
# 标记为已提醒
task["reminded"] = True
# 每分钟检查一次
schedule.every(1).minutes.do(check_tasks)
while True:
schedule.run_pending()
time.sleep(1)
def run(self):
"""运行AI助理"""
# 启动计划任务线程
scheduler_thread = threading.Thread(target=self.run_scheduler, daemon=True)
scheduler_thread.start()
# 构建界面
self.build_sidebar()
self.build_main_interface()
# 运行应用
if __name__ == "__main__":
assistant = AdvancedAIAssistant()
assistant.run()
3.2 部署配置
version: '3.8'
services:
ai-assistant:
build: .
container_name: ai-personal-assistant
ports:
- "8501:8501"
volumes:
- ./data:/app/data
- ./knowledge_base:/app/knowledge_base
- ./uploads:/app/uploads
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- STREAMLIT_SERVER_PORT=8501
- STREAMLIT_SERVER_ADDRESS=0.0.0.0
restart: unless-stopped
networks:
- ai-network
chromadb:
image: chromadb/chroma
container_name: chroma-db
ports:
- "8000:8000"
volumes:
- ./chroma_data:/chroma/chroma
networks:
- ai-network
networks:
ai-network:
driver: bridge
OPENAI_API_KEY=your-api-key-here
FROM python:3.9-slim
WORKDIR /app
RUN apt-get update && apt-get install -y \
gcc \
g++ \
portaudio19-dev \
espeak \
ffmpeg \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
RUN mkdir -p data knowledge_base uploads
EXPOSE 8501
CMD ["streamlit", "run", "advanced_assistant.py", "--server.port=8501", "--server.address=0.0.0.0"]
# requirements.txt
streamlit>=1.28.0
openai>=1.3.0
langchain>=0.0.340
chromadb>=0.4.15
whisper>=1.1.10
speechrecognition>=3.10.0
pyttsx3>=2.90
pydub>=0.25.1
pandas>=2.0.0
numpy>=1.24.0
pypdf2>=3.0.0
python-docx>=1.1.0
schedule>=1.2.0
python-dotenv>=1.0.0
4. 专业版:企业级AI助理系统
4.1 架构设计
"""
企业级AI助理系统架构
支持多用户、权限管理、审计日志等
"""
class EnterpriseAIAssistant:
"""企业级AI助理主类"""
def __init__(self, config_path="config/assistant.yaml"):
self.config = self._load_config(config_path)
self.db = DatabaseManager(self.config['database'])
self.auth = AuthenticationManager(self.config['auth'])
self.logger = AuditLogger(self.config['logging'])
self.plugin_manager = PluginManager()
self._initialize_modules()
def _initialize_modules(self):
"""初始化所有模块"""
modules = {
'workflow': WorkflowEngine,
'knowledge': KnowledgeGraphManager,
'automation': TaskAutomationEngine,
'analytics': UserAnalytics,
'integration': ThirdPartyIntegrations
}
for name, module_class in modules.items():
setattr(self, name, module_class(self.config.get(name, {})))
4.2 核心功能实现
"""
智能工作流引擎
"""
class WorkflowEngine:
"""工作流引擎,支持可视化流程设计"""
def __init__(self, config):
self.config = config
self.workflows = {}
self.triggers = {}
self.load_workflows()
def create_workflow(self, name, steps, triggers, permissions):
"""创建新工作流"""
workflow_id = str(uuid.uuid4())
workflow = {
'id': workflow_id,
'name': name,
'steps': steps,
'triggers': triggers,
'permissions': permissions,
'created_at': datetime.now(),
'updated_at': datetime.now(),
'status': 'active'
}
if self._validate_workflow(workflow):
self.workflows[workflow_id] = workflow
self._register_triggers(workflow_id, triggers)
self.db.save_workflow(workflow)
return workflow_id
else:
raise ValueError("工作流验证失败")
def execute_workflow(self, workflow_id, context):
"""执行工作流"""
workflow = self.workflows.get(workflow_id)
if not workflow:
raise ValueError(f"工作流 {workflow_id} 不存在")
execution_id = str(uuid.uuid4())
execution_record = {
'execution_id': execution_id,
'workflow_id': workflow_id,
'status': 'running',
'context': context,
'started_at': datetime.now(),
'steps': []
}
try:
for step in workflow['steps']:
step_result = self._execute_step(step, context)
execution_record['steps'].append({
'step': step['name'],
'result': step_result,
'timestamp': datetime.now()
})
if not step_result.get('success', False):
if step.get('continue_on_failure', False):
continue
else:
execution_record['status'] = 'failed'
break
if execution_record['status'] == 'running':
execution_record['status'] = 'completed'
execution_record['completed_at'] = datetime.now()
except Exception as e:
execution_record['status'] = 'error'
execution_record['error'] = str(e)
finally:
self.db.save_execution(execution_record)
self._notify_execution_result(execution_record)
return execution_record
def _execute_step(self, step, context):
"""执行单个步骤"""
step_type = step.get('type')
if step_type == 'ai_action':
return self._execute_ai_step(step, context)
elif step_type == 'api_call':
return self._execute_api_step(step, context)
elif step_type == 'data_processing':
return self._execute_data_step(step, context)
elif step_type == 'approval':
return self._execute_approval_step(step, context)
else:
raise ValueError(f"未知的步骤类型: {step_type}")
def _execute_ai_step(self, step, context):
"""执行AI步骤"""
prompt_template = step['prompt_template']
prompt = self._render_template(prompt_template, context)
response = self.ai_client.complete(
prompt=prompt,
model=step.get('model', 'gpt-4'),
temperature=step.get('temperature', 0.7)
)
result = self._parse_ai_response(response, step.get('output_format'))
return {
'success': True,
'output': result,
'raw_response': response,
'metadata': {
'model': step.get('model'),
'tokens_used': response.usage.total_tokens
}
}
4.3 知识图谱管理
"""
知识图谱管理模块
"""
class KnowledgeGraphManager:
"""知识图谱管理器"""
def __init__(self, config):
self.config = config
self.graph = self._initialize_graph()
self.embedding_model = self._load_embedding_model()
def add_entity(self, entity_type, properties, relationships=None):
"""添加实体到知识图谱"""
entity_id = f"{entity_type}_{uuid.uuid4().hex[:8]}"
entity = {
'id': entity_id,
'type': entity_type,
'properties': properties,
'created_at': datetime.now(),
'embedding': self._generate_embedding(properties)
}
self.graph.add_node(entity)
if relationships:
for rel in relationships:
self.add_relationship(
source_id=entity_id,
target_id=rel['target_id'],
relationship_type=rel['type'],
properties=rel.get('properties', {})
)
return entity_id
def query_graph(self, query, limit=10):
"""查询知识图谱"""
query_embedding = self._generate_embedding(query)
similar_entities = self.graph.similarity_search(
query_embedding,
limit=limit * 2
)
expanded_results = []
for entity in similar_entities:
related = self.graph.get_neighbors(
entity['id'],
depth=2,
relationship_types=['related_to', 'part_of', 'used_by']
)
expanded_results.extend(related)
results = self._rank_results(expanded_results, query)
return results[:limit]
def generate_insights(self, topic, depth=3):
"""生成洞察"""
related_entities = self.query_graph(topic, limit=20)
insights = []
for entity in related_entities:
centrality = self.graph.calculate_centrality(entity['id'])
patterns = self._discover_patterns(entity['id'], depth)
insight_text = self._generate_insight_text(entity, patterns, centrality)
insights.append({
'entity': entity,
'centrality': centrality,
'patterns': patterns,
'insight': insight_text,
'confidence': self._calculate_confidence(entity, patterns)
})
return sorted(insights, key=lambda x: x['confidence'], reverse=True)
5. 移动端AI助理应用
5.1 Flutter + ChatGPT移动应用
import 'package:flutter/material.dart';
import 'package:speech_to_text/speech_to_text.dart' as stt;
import 'package:flutter_tts/flutter_tts.dart';
import 'package:http/http.dart' as http;
import 'dart:convert';
import 'package:shared_preferences/shared_preferences.dart';
import 'package:file_picker/file_picker.dart';
import 'dart:io';
void main() {
runApp(AIPersonalAssistantApp());
}
class AIPersonalAssistantApp extends StatelessWidget {
@override
Widget build(BuildContext context) {
return MaterialApp(
title: 'AI个人助理',
theme: ThemeData(
primarySwatch: Colors.blue,
useMaterial3: true,
),
home: AssistantHomePage(),
);
}
}
class AssistantHomePage extends StatefulWidget {
@override
_AssistantHomePageState createState() => _AssistantHomePageState();
}
class _AssistantHomePageState extends State<AssistantHomePage> {
final stt.SpeechToText _speech = stt.SpeechToText();
final FlutterTts _tts = FlutterTts();
List<Message> _messages = [];
bool _isListening = false;
bool _isProcessing = false;
String _apiKey = '';
TextEditingController _textController = TextEditingController();
ScrollController _scrollController = ScrollController();
@override
void initState() {
super.initState();
_initSpeech();
_initTTS();
_loadApiKey();
_loadHistory();
}
void _initSpeech() async {
bool available = await _speech.initialize();
if (!available) {
print("语音识别不可用");
}
}
void _initTTS() async {
await _tts.setLanguage("zh-CN");
await _tts.setPitch(1.0);
await _tts.setSpeechRate(0.5);
}
void _loadApiKey() async {
SharedPreferences prefs = await SharedPreferences.getInstance();
setState(() {
_apiKey = prefs.getString('api_key') ?? '';
});
}
void _loadHistory() async {
SharedPreferences prefs = await SharedPreferences.getInstance();
String history = prefs.getString('chat_history') ?? '[]';
List<dynamic> historyList = json.decode(history);
setState(() {
_messages = historyList.map((item) => Message.fromJson(item)).toList();
});
}
void _saveHistory() async {
SharedPreferences prefs = await SharedPreferences.getInstance();
String history = json.encode(_messages.map((msg) => msg.toJson()).toList());
await prefs.setString('chat_history', history);
}
void _startListening() async {
if (!_isListening) {
bool available = await _speech.initialize();
if (available) {
setState(() => _isListening = true);
_speech.listen(
onResult: (result) {
setState(() {
_textController.text = result.recognizedWords;
});
if (result.finalResult) {
_sendMessage(result.recognizedWords);
setState(() => _isListening = false);
}
},
localeId: 'zh_CN',
);
}
} else {
setState(() => _isListening = false);
_speech.stop();
}
}
void _sendMessage(String text) async {
if (text.trim().isEmpty) return;
setState(() {
_messages.add(Message(
text: text,
isUser: true,
timestamp: DateTime.now(),
));
_isProcessing = true;
});
_textController.clear();
_scrollToBottom();
try {
String response = await _callChatGPT(text);
setState(() {
_messages.add(Message(
text: response,
isUser: false,
timestamp: DateTime.now(),
));
_isProcessing = false;
});
_saveHistory();
_scrollToBottom();
await _tts.speak(response);
} catch (e) {
setState(() {
_messages.add(Message(
text: "抱歉,出错了: $e",
isUser: false,
timestamp: DateTime.now(),
));
_isProcessing = false;
});
}
}
Future<String> _callChatGPT(String message) async {
if (_apiKey.isEmpty) {
return "请先设置API Key";
}
final response = await http.post(
Uri.parse('https://api.openai.com/v1/chat/completions'),
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer $_apiKey',
},
body: json.encode({
'model': 'gpt-3.5-turbo',
'messages': [
{'role': 'system', 'content': '你是一个有帮助的AI助理'},
{'role': 'user', 'content': message}
],
'temperature': 0.7,
'max_tokens': 1000,
}),
);
if (response.statusCode == 200) {
Map<String, dynamic> data = json.decode(response.body);
return data['choices'][0]['message']['content'];
} else {
throw Exception('API调用失败: ${response.statusCode}');
}
}
void _scrollToBottom() {
WidgetsBinding.instance.addPostFrameCallback((_) {
if (_scrollController.hasClients) {
_scrollController.animateTo(
_scrollController.position.maxScrollExtent,
duration: Duration(milliseconds: 300),
curve: Curves.easeOut,
);
}
});
}
void _pickFile() async {
FilePickerResult? result = await FilePicker.platform.pickFiles();
if (result != null) {
PlatformFile file = result.files.first;
String content = await File(file.path!).readAsString();
_sendMessage("请帮我分析这个文件:\n\n$content");
}
}
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: Text('AI个人助理'),
actions: [
IconButton(
icon: Icon(Icons.settings),
onPressed: () {
Navigator.push(
context,
MaterialPageRoute(builder: (context) => SettingsPage()),
);
},
),
],
),
body: Column(
children: [
Expanded(
child: ListView.builder(
controller: _scrollController,
itemCount: _messages.length,
itemBuilder: (context, index) {
return MessageBubble(message: _messages[index]);
},
),
),
Container(
padding: EdgeInsets.all(8.0),
color: Theme.of(context).cardColor,
child: Row(
children: [
IconButton(
icon: Icon(
_isListening ? Icons.mic_off : Icons.mic,
color: _isListening ? Colors.red : Colors.blue,
),
onPressed: _startListening,
),
IconButton(
icon: Icon(Icons.attach_file),
onPressed: _pickFile,
),
Expanded(
child: TextField(
controller: _textController,
decoration: InputDecoration(
hintText: '输入消息...',
border: OutlineInputBorder(
borderRadius: BorderRadius.circular(20.0),
),
contentPadding: EdgeInsets.symmetric(
horizontal: 16.0,
vertical: 8.0,
),
),
onSubmitted: _sendMessage,
),
),
IconButton(
icon: Icon(Icons.send),
onPressed: () => _sendMessage(_textController.text),
color: Colors.blue,
),
],
),
),
],
),
floatingActionButton: _isProcessing
? FloatingActionButton(
onPressed: null,
child: CircularProgressIndicator(
valueColor: AlwaysStoppedAnimation<Color>(Colors.white),
),
backgroundColor: Colors.blue,
)
: null,
);
}
}
class Message {
final String text;
final bool isUser;
final DateTime timestamp;
Message({
required this.text,
required this.isUser,
required this.timestamp,
});
factory Message.fromJson(Map<String, dynamic> json) {
return Message(
text: json['text'],
isUser: json['isUser'],
timestamp: DateTime.parse(json['timestamp']),
);
}
Map<String, dynamic> toJson() {
return {
'text': text,
'isUser': isUser,
'timestamp': timestamp.toIso8601String(),
};
}
}
class MessageBubble extends StatelessWidget {
final Message message;
MessageBubble({required this.message});
@override
Widget build(BuildContext context) {
return Container(
margin: EdgeInsets.symmetric(vertical: 4.0, horizontal: 8.0),
child: Row(
crossAxisAlignment: CrossAxisAlignment.start,
children: message.isUser
? [_buildUserMessage(context)]
: [_buildAssistantMessage(context)],
),
);
}
Widget _buildUserMessage(BuildContext context) {
return Expanded(
child: Column(
crossAxisAlignment: CrossAxisAlignment.end,
children: [
Container(
padding: EdgeInsets.all(12.0),
decoration: BoxDecoration(
color: Colors.blue[100],
borderRadius: BorderRadius.only(
topLeft: Radius.circular(15.0),
topRight: Radius.circular(15.0),
bottomLeft: Radius.circular(15.0),
),
),
child: Text(
message.text,
style: TextStyle(fontSize: 16.0),
),
),
Padding(
padding: EdgeInsets.only(top: 4.0),
child: Text(
_formatTime(message.timestamp),
style: TextStyle(fontSize: 12.0, color: Colors.grey),
),
),
],
),
);
}
Widget _buildAssistantMessage(BuildContext context) {
return Expanded(
child: Column(
crossAxisAlignment: CrossAxisAlignment.start,
children: [
Container(
padding: EdgeInsets.all(12.0),
decoration: BoxDecoration(
color: Colors.grey[200],
borderRadius: BorderRadius.only(
topLeft: Radius.circular(15.0),
topRight: Radius.circular(15.0),
bottomRight: Radius.circular(15.0),
),
),
child: Text(
message.text,
style: TextStyle(fontSize: 16.0),
),
),
Padding(
padding: EdgeInsets.only(top: 4.0),
child: Text(
_formatTime(message.timestamp),
style: TextStyle(fontSize: 12.0, color: Colors.grey),
),
),
],
),
);
}
String _formatTime(DateTime time) {
return '${time.hour.toString().padLeft(2, '0')}:${time.minute.toString().padLeft(2, '0')}';
}
}
class SettingsPage extends StatefulWidget {
@override
_SettingsPageState createState() => _SettingsPageState();
}
class _SettingsPageState extends State<SettingsPage> {
final _apiKeyController = TextEditingController();
@override
void initState() {
super.initState();
_loadApiKey();
}
void _loadApiKey() async {
SharedPreferences prefs = await SharedPreferences.getInstance();
String apiKey = prefs.getString('api_key') ?? '';
setState(() {
_apiKeyController.text = apiKey;
});
}
void _saveApiKey() async {
SharedPreferences prefs = await SharedPreferences.getInstance();
await prefs.setString('api_key', _apiKeyController.text);
ScaffoldMessenger.of(context).showSnackBar(
SnackBar(content: Text('API Key已保存')),
);
Navigator.pop(context);
}
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: Text('设置'),
),
body: Padding(
padding: EdgeInsets.all(16.0),
child: Column(
crossAxisAlignment: CrossAxisAlignment.start,
children: [
Text(
'OpenAI API Key',
style: TextStyle(fontSize: 16.0, fontWeight: FontWeight.bold),
),
SizedBox(height: 8.0),
TextField(
controller: _apiKeyController,
decoration: InputDecoration(
hintText: '请输入您的OpenAI API Key',
border: OutlineInputBorder(),
),
obscureText: true,
),
SizedBox(height: 16.0),
Text(
'如何获取API Key:',
style: TextStyle(fontWeight: FontWeight.bold),
),
SizedBox(height: 8.0),
Text('1. 访问 https://platform.openai.com/api-keys'),
Text('2. 登录您的OpenAI账户'),
Text('3. 点击"Create new secret key"'),
Text('4. 复制生成的Key并粘贴到上方'),
SizedBox(height: 24.0),
ElevatedButton(
onPressed: _saveApiKey,
child: Text('保存设置'),
style: ElevatedButton.styleFrom(
minimumSize: Size(double.infinity, 48),
),
),
],
),
),
);
}
}
5.2 Android/iOS原生集成
class MainActivity : AppCompatActivity() {
private lateinit var speechRecognizer: SpeechRecognizer
private lateinit var textToSpeech: TextToSpeech
private lateinit var chatGPT: ChatGPTClient
private lateinit var binding: ActivityMainBinding
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
binding = ActivityMainBinding.inflate(layoutInflater)
setContentView(binding.root)
initializeSpeechRecognition()
initializeTextToSpeech()
initializeChatGPT()
setupUI()
}
private fun initializeSpeechRecognition() {
speechRecognizer = SpeechRecognizer.createSpeechRecognizer(this)
speechRecognizer.setRecognitionListener(object : RecognitionListener {
override fun onReadyForSpeech(params: Bundle?) {
runOnUiThread {
binding.statusText.text = "请说话..."
binding.micButton.setBackgroundColor(Color.RED)
}
}
override fun onBeginningOfSpeech() {
}
override fun onRmsChanged(rmsdB: Float) {
}
override fun onBufferReceived(buffer: ByteArray?) {
}
override fun onEndOfSpeech() {
runOnUiThread {
binding.micButton.setBackgroundColor(Color.BLUE)
}
}
override fun onError(error: Int) {
runOnUiThread {
binding.statusText.text = "识别错误: $error"
}
}
override fun onResults(results: Bundle?) {
val matches = results?.getStringArrayList(SpeechRecognizer.RESULTS_RECOGNITION)
if (!matches.isNullOrEmpty()) {
val text = matches[0]
runOnUiThread {
binding.inputText.setText(text)
sendMessageToAI(text)
}
}
}
override fun onPartialResults(partialResults: Bundle?) {
}
override fun onEvent(eventType: Int, params: Bundle?) {
}
})
}
private fun initializeTextToSpeech() {
textToSpeech = TextToSpeech(this) { status ->
if (status == TextToSpeech.SUCCESS) {
textToSpeech.language = Locale.CHINESE
textToSpeech.setPitch(1.0f)
textToSpeech.setSpeechRate(0.8f)
}
}
}
private fun initializeChatGPT() {
val apiKey = getApiKeyFromPrefs()
chatGPT = ChatGPTClient(apiKey)
}
private fun setupUI() {
binding.micButton.setOnClickListener {
if (!isListening) {
startListening()
} else {
stopListening()
}
}
binding.sendButton.setOnClickListener {
val message = binding.inputText.text.toString()
if (message.isNotEmpty()) {
sendMessageToAI(message)
binding.inputText.text.clear()
}
}
binding.settingsButton.setOnClickListener {
val intent = Intent(this, SettingsActivity::class.java)
startActivity(intent)
}
}
private fun startListening() {
val intent = Intent(RecognizerIntent.ACTION_RECOGNIZE_SPEECH)
intent.putExtra(RecognizerIntent.EXTRA_LANGUAGE_MODEL,
RecognizerIntent.LANGUAGE_MODEL_FREE_FORM)
intent.putExtra(RecognizerIntent.EXTRA_LANGUAGE, "zh-CN")
intent.putExtra(RecognizerIntent.EXTRA_PROMPT, "请说话")
speechRecognizer.startListening(intent)
isListening = true
}
private fun stopListening() {
speechRecognizer.stopListening()
isListening = false
}
private fun sendMessageToAI(message: String) {
runOnUiThread {
addMessageToChat("用户", message)
}
GlobalScope.launch(Dispatchers.IO) {
try {
val response = chatGPT.sendMessage(message)
runOnUiThread {
addMessageToChat("AI助手", response)
textToSpeech.speak(response, TextToSpeech.QUEUE_FLUSH, null, null)
}
} catch (e: Exception) {
runOnUiThread {
addMessageToChat("系统", "错误: ${e.message}")
}
}
}
}
private fun addMessageToChat(sender: String, message: String) {
val chatMessage = "$sender: $message\n\n"
binding.chatHistory.append(chatMessage)
binding.scrollView.post {
binding.scrollView.fullScroll(View.FOCUS_DOWN)
}
}
}
6. 安全与隐私保护
6.1 数据加密方案
"""
数据加密管理器
"""
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2
import base64
import os
class EncryptionManager:
"""加密管理器"""
def __init__(self, master_key=None):
self.master_key = master_key or self._generate_master_key()
self.fernet = Fernet(self.master_key)
def _generate_master_key(self):
"""生成主密钥"""
salt = os.urandom(16)
kdf = PBKDF2(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
)
password = os.environ.get('ENCRYPTION_PASSWORD', 'default_password')
key = base64.urlsafe_b64encode(kdf.derive(password.encode()))
return key
def encrypt_data(self, data):
"""加密数据"""
if isinstance(data, dict):
data = json.dumps(data)
encrypted = self.fernet.encrypt(data.encode())
return encrypted
def decrypt_data(self, encrypted_data):
"""解密数据"""
try:
decrypted = self.fernet.decrypt(encrypted_data)
return decrypted.decode()
except Exception as e:
raise ValueError(f"解密失败: {str(e)}")
def encrypt_file(self, file_path, output_path=None):
"""加密文件"""
if output_path is None:
output_path = file_path + '.enc'
with open(file_path, 'rb') as f:
data = f.read()
encrypted = self.fernet.encrypt(data)
with open(output_path, 'wb') as f:
f.write(encrypted)
return output_path
def decrypt_file(self, encrypted_path, output_path=None):
"""解密文件"""
if output_path is None:
output_path = encrypted_path.replace('.enc', '')
with open(encrypted_path, 'rb') as f:
encrypted_data = f.read()
decrypted = self.fernet.decrypt(encrypted_data)
with open(output_path, 'wb') as f:
f.write(decrypted)
return output_path
6.2 隐私保护策略
隐私保护策略:
数据收集:
- 明确告知用户收集哪些数据
- 仅收集必要数据
- 提供数据删除选项
数据处理:
- 本地处理优先于云端处理
- 敏感数据本地加密存储
- API调用使用端到端加密
数据存储:
- 用户数据本地存储
- 加密存储敏感信息
- 定期清理临时数据
数据共享:
- 不共享用户数据给第三方
- API调用使用匿名化处理
- 提供数据导出选项
合规性:
- 遵循GDPR规定
- 支持用户数据访问权
- 提供数据删除功能
安全措施:
- 定期安全审计
- 漏洞报告机制
- 数据备份加密
7. 性能优化方案
7.1 缓存策略
"""
智能缓存系统
"""
import time
from functools import wraps
from collections import OrderedDict
import hashlib
import json
class IntelligentCache:
"""智能缓存"""
def __init__(self, max_size=1000, ttl=3600):
self.cache = OrderedDict()
self.max_size = max_size
self.ttl = ttl
self.hits = 0
self.misses = 0
def _generate_key(self, *args, **kwargs):
"""生成缓存键"""
key_parts = []
for arg in args:
if isinstance(arg, (dict, list)):
key_parts.append(json.dumps(arg, sort_keys=True))
else:
key_parts.append(str(arg))
for k, v in sorted(kwargs.items()):
if isinstance(v, (dict, list)):
key_parts.append(f"{k}:{json.dumps(v, sort_keys=True)}")
else:
key_parts.append(f"{k}:{v}")
key_string = "|".join(key_parts)
return hashlib.md5(key_string.encode()).hexdigest()
def get(self, key):
"""获取缓存"""
if key in self.cache:
entry = self.cache[key]
if time.time() - entry['timestamp'] > self.ttl:
del self.cache[key]
self.misses += 1
return None
self.cache.move_to_end(key)
self.hits += 1
return entry['value']
self.misses += 1
return None
def set(self, key, value):
"""设置缓存"""
if len(self.cache) >= self.max_size:
self.cache.popitem(last=False)
self.cache[key] = {
'value': value,
'timestamp': time.time(),
'access_count': 0
}
self.cache.move_to_end(key)
def cache_decorator(self, ttl=None):
"""缓存装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
skip_cache = kwargs.pop('skip_cache', False)
if skip_cache:
return func(*args, **kwargs)
cache_key = self._generate_key(
func.__name__, *args, **kwargs
)
cached_result = self.get(cache_key)
if cached_result is not None:
return cached_result
result = func(*args, **kwargs)
self.set(cache_key, result)
return result
return wrapper
return decorator
def get_stats(self):
"""获取统计信息"""
hit_rate = self.hits / (self.hits + self.misses) if (self.hits + self.misses) > 0 else 0
return {
'total_entries': len(self.cache),
'hits': self.hits,
'misses': self.misses,
'hit_rate': f"{hit_rate:.2%}",
'memory_usage': self._estimate_memory_usage()
}
def _estimate_memory_usage(self):
"""估计内存使用"""
total_size = 0
for key, value in self.cache.items():
total_size += len(key)
total_size += len(str(value))
return f"{total_size / 1024:.2f} KB"
8. 监控与维护
8.1 健康监控系统
"""
健康监控系统
"""
import psutil
import time
import logging
from datetime import datetime
import threading
import smtplib
from email.mime.text import MIMEText
class HealthMonitor:
"""健康监控"""
def __init__(self, config):
self.config = config
self.metrics = {}
self.alerts = []
self.logger = logging.getLogger(__name__)
self.thresholds = {
'cpu_percent': 80.0,
'memory_percent': 85.0,
'disk_percent': 90.0,
'response_time': 5.0,
'error_rate': 0.05,
}
def start_monitoring(self):
"""开始监控"""
self.logger.info("开始健康监控")
threads = [
threading.Thread(target=self._monitor_system_resources),
threading.Thread(target=self._monitor_application_health),
threading.Thread(target=self._monitor_external_services),
threading.Thread(target=self._process_alerts),
]
for thread in threads:
thread.daemon = True
thread.start()
def _monitor_system_resources(self):
"""监控系统资源"""
while True:
metrics = {
'timestamp': datetime.now().isoformat(),
'cpu_percent': psutil.cpu_percent(interval=1),
'memory_percent': psutil.virtual_memory().percent,
'disk_percent': psutil.disk_usage('/').percent,
'network_io': psutil.net_io_counters()._asdict(),
}
self.metrics['system'] = metrics
self._check_thresholds(metrics)
time.sleep(60)
def _monitor_application_health(self):
"""监控应用健康"""
while True:
try:
response_time = self._check_api_response_time()
error_rate = self._calculate_error_rate()
metrics = {
'timestamp': datetime.now().isoformat(),
'response_time': response_time,
'error_rate': error_rate,
'active_users': self._get_active_users(),
'api_calls': self._get_api_call_count(),
}
self.metrics['application'] = metrics
self._check_thresholds(metrics)
except Exception as e:
self.logger.error(f"应用健康监控失败: {str(e)}")
time.sleep(30)
def _check_thresholds(self, metrics):
"""检查阈值"""
for metric_name, value in metrics.items():
if metric_name in self.thresholds:
threshold = self.thresholds[metric_name]
if isinstance(value, (int, float)) and value > threshold:
alert = {
'level': 'WARNING',
'metric': metric_name,
'value': value,
'threshold': threshold,
'timestamp': datetime.now().isoformat(),
'message': f"{metric_name} 超过阈值: {value} > {threshold}"
}
self.alerts.append(alert)
self.logger.warning(alert['message'])
def _process_alerts(self):
"""处理告警"""
while True:
if self.alerts:
alert = self.alerts.pop(0)
self._send_alert(alert)
self.logger.error(f"告警: {alert['message']}")
time.sleep(1)
def _send_alert(self, alert):
"""发送告警"""
try:
if self.config.get('email_alerts', False):
self._send_email_alert(alert)
if self.config.get('webhook_alerts', False):
self._send_webhook_alert(alert)
if self.config.get('sms_alerts', False):
self._send_sms_alert(alert)
except Exception as e:
self.logger.error(f"发送告警失败: {str(e)}")
def get_health_status(self):
"""获取健康状态"""
status = {
'timestamp': datetime.now().isoformat(),
'overall_status': 'HEALTHY',
'system': self.metrics.get('system', {}),
'application': self.metrics.get('application', {}),
'alerts': self.alerts[:10],
'statistics': self._get_statistics(),
}
if any(alert['level'] == 'ERROR' for alert in self.alerts):
status['overall_status'] = 'ERROR'
elif any(alert['level'] == 'WARNING' for alert in self.alerts):
status['overall_status'] = 'WARNING'
return status
9. 部署与运维
9.1 一键部署脚本
#!/bin/bash
set -e
echo "🚀 开始部署AI个人助理..."
check_dependencies() {
echo "🔍 检查系统依赖..."
if ! command -v docker &> /dev/null; then
echo "❌ Docker未安装"
exit 1
fi
if ! command -v docker-compose &> /dev/null; then
echo "❌ Docker Compose未安装"
exit 1
fi
echo "✅ 所有依赖已安装"
}
setup_environment() {
echo "⚙️ 配置环境..."
mkdir -p ./data ./knowledge_base ./uploads ./logs ./backups
if [ ! -f .env ]; then
echo "📝 创建环境变量文件..."
cat > .env << EOF
# OpenAI API Key
OPENAI_API_KEY=${OPENAI_API_KEY:-"your-api-key-here"}
# 数据库配置
POSTGRES_USER=assistant
POSTGRES_PASSWORD=$(openssl rand -base64 32)
POSTGRES_DB=ai_assistant
# Redis配置
REDIS_PASSWORD=$(openssl rand -base64 32)
# 应用配置
APP_PORT=8501
APP_ENV=production
APP_DEBUG=false
# 邮件配置(可选)
SMTP_HOST=smtp.gmail.com
SMTP_PORT=587
SMTP_USER=your-email@gmail.com
SMTP_PASSWORD=your-password
EOF
echo "⚠️ 请编辑 .env 文件配置您的API Key和其他设置"
fi
}
deploy_application() {
echo "📦 部署应用..."
echo "🔨 构建Docker镜像..."
docker-compose build
echo "🚀 启动服务..."
docker-compose up -d
echo "⏳ 等待服务启动..."
sleep 30
echo "🔍 检查服务状态..."
docker-compose ps
echo "🗃️ 运行数据库迁移..."
docker-compose exec app python manage.py migrate
echo "👤 创建管理员用户..."
docker-compose exec app python manage.py createsuperuser \
--username admin \
--email admin@example.com \
--noinput || true
}
setup_nginx() {
echo "🌐 配置Nginx反向代理..."
if [ "$SETUP_NGINX" = "true" ]; then
if command -v apt-get &> /dev/null; then
sudo apt-get update
sudo apt-get install -y nginx
elif command -v yum &> /dev/null; then
sudo yum install -y nginx
fi
sudo tee /etc/nginx/sites-available/ai-assistant << 'EOF'
server {
listen 80;
server_name ai-assistant.your-domain.com;
location / {
proxy_pass http://localhost:8501;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection 'upgrade';
proxy_set_header Host $host;
proxy_cache_bypass $http_upgrade;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
location /static/ {
alias /app/static/;
expires 1y;
add_header Cache-Control "public, immutable";
}
}
EOF
sudo ln -sf /etc/nginx/sites-available/ai-assistant \
/etc/nginx/sites-enabled/
sudo nginx -t
sudo systemctl restart nginx
echo "✅ Nginx配置完成"
fi
}
setup_ssl() {
echo "🔒 设置SSL证书..."
if [ "$SETUP_SSL" = "true" ]; then
if command -v apt-get &> /dev/null; then
sudo apt-get install -y certbot python3-certbot-nginx
elif command -v yum &> /dev/null; then
sudo yum install -y certbot python3-certbot-nginx
fi
sudo certbot --nginx -d ai-assistant.your-domain.com
echo "0 0,12 * * * root python3 -c 'import random; import time; time.sleep(random.random() * 3600)' && certbot renew -q" | sudo tee -a /etc/crontab > /dev/null
echo "✅ SSL证书设置完成"
fi
}
setup_backup() {
echo "💾 设置备份..."
cat > /usr/local/bin/backup-assistant.sh << 'EOF'
BACKUP_DIR="/backups/ai-assistant"
DATE=$(date +%Y%m%d_%H%M%S)
BACKUP_FILE="$BACKUP_DIR/backup_$DATE.tar.gz"
mkdir -p $BACKUP_DIR
tar -czf $BACKUP_FILE \
--exclude="*.log" \
--exclude="*.pyc" \
--exclude="__pycache__" \
./data \
./knowledge_base \
./uploads \
./docker-compose.yml \
./.env
find $BACKUP_DIR -name "backup_*.tar.gz" -mtime +7 -delete
echo "备份完成: $BACKUP_FILE"
EOF
chmod +x /usr/local/bin/backup-assistant.sh
(crontab -l 2>/dev/null; echo "0 2 * * * /usr/local/bin/backup-assistant.sh") | crontab -
echo "✅ 备份设置完成"
}
main() {
echo "🎯 AI个人助理部署脚本"
echo "======================"
while [[ $# -gt 0 ]]; do
case $1 in
--nginx) SETUP_NGINX=true ;;
--ssl) SETUP_SSL=true ;;
--backup) SETUP_BACKUP=true ;;
*) echo "未知参数: $1" ;;
esac
shift
done
check_dependencies
setup_environment
deploy_application
[ "$SETUP_NGINX" = "true" ] && setup_nginx
[ "$SETUP_SSL" = "true" ] && setup_ssl
[ "$SETUP_BACKUP" = "true" ] && setup_backup
echo ""
echo "🎉 部署完成!"
echo "======================"
echo "访问地址: http://localhost:8501"
[ "$SETUP_NGINX" = "true" ] && echo "域名访问: http://ai-assistant.your-domain.com"
echo ""
echo "📋 后续操作:"
echo "1. 访问Web界面完成初始设置"
echo "2. 配置您的OpenAI API Key"
echo "3. 导入您的个人数据"
echo "4. 开始使用AI助理!"
echo ""
echo "🛠️ 管理命令:"
echo "查看日志: docker-compose logs -f"
echo "停止服务: docker-compose down"
echo "更新应用: docker-compose pull && docker-compose up -d"
echo "备份数据: /usr/local/bin/backup-assistant.sh"
}
main "$@"
10. 总结与展望
10.1 成本效益分析
# AI个人助理成本效益分析
## 初始投入
1. **开发成本**
- 基础版:24小时开发时间
- 进阶版:1-2周开发时间
- 专业版:1-2个月开发时间
2. **硬件成本**
- 云服务器:$10-50/月
- 本地服务器:一次性$500-2000
- 移动设备:已有
3. **软件成本**
- OpenAI API:$0.002/1K tokens (约$10-50/月)
- 其他API:$0-20/月
## 长期效益
1. **时间节省**
- 信息查询:节省1-2小时/天
- 日程管理:节省30分钟/天
- 文件处理:节省1小时/天
- 总节省:2.5-3.5小时/天 ≈ $75-105/天 (按$30/小时计算)
2. **效率提升**
- 决策速度提升:30-50%
- 错误率降低:40-60%
- 知识获取速度:提升5-10倍
3. **投资回报率(ROI)**
- 月成本:$50-100
- 月价值:$1500-3150 (节省的时间价值)
- ROI:1500-3150% (15-31倍回报)
10.2 未来发展方向
短期目标 (1-3个月):
- 多模态支持: 图像识别、视频分析
- 实时协作: 多人共享助理
- 离线功能: 本地模型支持
- 插件生态: 第三方技能扩展
中期目标 (3-12个月):
- 情感智能: 情绪识别与响应
- 预测分析: 主动建议和提醒
- 物联网集成: 智能家居控制
- 增强现实: AR界面交互
长期目标 (1-3年):
- 全自主代理: 完全自主完成任务
- 个性化学习: 深度适应个人习惯
- 脑机接口: 思维直接交互
- 量子计算: 超复杂问题求解
商业化方向:
- 企业版: 团队协作助理
- 教育版: 个性化学习助手
- 医疗版: 健康管理顾问
- 开发者版: 编程辅助工具
10.3 立即行动建议
# 开始打造你的AI个人助理
## 第一步:选择适合你的版本
- **新手入门**: 微信机器人版 (24小时部署)
- **个人使用**: 桌面Web版 (1周完成)
- **团队协作**: 企业版 (1-2个月)
- **移动优先**: 移动应用版 (2-4周)
## 第二步:获取必要资源
1. OpenAI API Key: https://platform.openai.com/api-keys
2. 云服务器: 推荐阿里云/腾讯云入门套餐
3. 域名(可选): 用于专业部署
4. SSL证书(可选): Let's Encrypt免费
## 第三步:逐步实施
第1周: 部署基础版,测试核心功能
第2周: 添加语音和文件处理
第3周: 集成日程管理和知识库
第4周: 优化性能和用户体验
## 第四步:持续优化
- 每周回顾使用情况
- 每月添加新功能
- 每季度进行系统升级
- 每年评估整体效果
## 获取帮助
- 文档: 本教程的所有代码和配置
- 社区: GitHub项目issue讨论
- 更新: 关注AI技术发展
- 定制: 根据需求调整功能
通过本教程,你已经掌握了从零开始打造AI个人助理的全套技术方案。从基础的微信机器人到企业级的全功能系统,你可以根据自身需求和技术水平选择合适的实现路径。
记住,AI个人助理的核心价值在于个性化和实用性。开始构建你的专属助理,让它成为你工作和生活中不可或缺的智能伙伴!