快速设计一个网站,企业网络设计方案论文,做网站的软件 简单易学,动易网站迁移目录
案例目标技术栈与核心依赖环境配置案例实现案例效果案例实现思路扩展建议总结
案例目标
本案例实现了一个基于多智能体的调度系统#xff0c;能够从自然语言查询中提取时间信息#xff0c;执行定时信息检索#xff0c;并通过邮件发送结果。系统架构由Query Analysis…目录案例目标技术栈与核心依赖环境配置案例实现案例效果案例实现思路扩展建议总结案例目标本案例实现了一个基于多智能体的调度系统能够从自然语言查询中提取时间信息执行定时信息检索并通过邮件发送结果。系统架构由Query Analysis Agent、Search Router、Response Agent和Scheduling System组成实现了从自然语言查询到定时信息检索与邮件发送的全流程自动化。主要功能包括自然语言查询解析与时间信息提取多种搜索类型支持学术论文、新闻、一般搜索任务调度与执行管理搜索结果格式化与邮件发送任务状态跟踪与持久化存储技术栈与核心依赖主要技术栈语言Python 3.xAI框架LangChain, OpenAI GPT-4任务调度Schedule邮件服务Yagmail向量数据库ChromaDB搜索APIarXiv API, NewsAPI, SerpAPI核心依赖库名用途langchain构建AI应用的核心框架langchain-openaiOpenAI模型集成schedule任务调度yagmail邮件发送chromadb向量数据库存储requestsHTTP请求处理urllibURL处理与请求环境配置API密钥配置系统需要配置以下API密钥环境变量配置import os # OpenAI API密钥 os.environ[OPENAI_API_KEY] your_openai_api_key # NewsAPI密钥 os.environ[NEWS_API_KEY] your_news_api_key # SerpAPI密钥 os.environ[SERPAPI_API_KEY] your_serpapi_key邮件配置系统使用Gmail发送邮件需要配置Gmail应用密码注意需要在Google账户中启用两步验证并生成应用密码。 Google账户安全设置邮件配置email_config { username: your_emailgmail.com, password: your_app_password, # Gmail应用密码 }依赖安装requirements.txtlangchain langchain-openai langchain-community schedule yagmail chromadb requests pytz案例实现1. Query Analysis Agent - 查询分析智能体Query Analysis Agent负责解析自然语言查询提取时间信息和搜索需求。QueryAnalysisAgent类from langchain_openai import ChatOpenAI from langchain.prompts import PromptTemplate from langchain.schema import HumanMessage import json import re from datetime import datetime, timedelta import pytz class QueryAnalysisAgent: def __init__(self): self.llm ChatOpenAI(modelgpt-4o) self.setup_prompts() def setup_prompts(self): # 时间提取提示 self.time_extraction_prompt PromptTemplate.from_template( Extract time information from the following query. Query: {query} Return a JSON object with the following fields: - target_time: The specific time mentioned in the query (in YYYY-MM-DD HH:MM:SS format) - execution_time: The time to execute the task (5 minutes before target_time) - time_sensitivity: urgent if the query needs immediate results, normal otherwise If no specific time is mentioned, use current time as target_time. ) # 任务分析提示 self.task_analysis_prompt PromptTemplate.from_template( Analyze the following query to determine search requirements. Query: {query} Time info: {time_info} Return a JSON object with the following fields: - task_type: search or other task types - search_type: research_paper, news, or general - keywords: List of relevant keywords for the search - requirements: Any specific requirements mentioned - time_sensitivity: urgent or normal ) def extract_time(self, query: str) - dict: 从查询中提取时间信息 try: # 使用LLM提取时间信息 response self.time_extraction_prompt.format(queryquery) response self.llm.invoke(response) time_info json.loads(response.content) # 处理时间信息 if target_time in time_info: # 如果是相对时间如7 AM转换为具体时间 target_time self._parse_relative_time(time_info[target_time]) time_info[target_time] target_time # 执行时间设为目标时间前5分钟 execution_time target_time - timedelta(minutes5) time_info[execution_time] execution_time return time_info except Exception as e: # 默认处理当前时间1小时作为目标时间 now datetime.now(pytz.utc) target_time now timedelta(hours1) execution_time target_time - timedelta(minutes5) return { target_time: target_time, execution_time: execution_time, time_sensitivity: normal } def analyze_task(self, query: str, time_info: dict) - dict: 分析查询任务类型和搜索需求 try: # 使用LLM分析任务 response self.task_analysis_prompt.format( queryquery, time_infojson.dumps(time_info) ) response self.llm.invoke(response) task_info json.loads(response.content) # 添加原始查询和状态 task_info[original_query] query task_info[status] success return task_info except Exception as e: # 默认处理一般搜索 return { task_type: search, search_type: general, keywords: query.split(), requirements: minimum 5 results, time_sensitivity: normal, original_query: query, status: success } def analyze_query(self, query: str) - dict: 完整的查询分析流程 # 提取时间信息 time_info self.extract_time(query) # 分析任务需求 task_info self.analyze_task(query, time_info) # 合并结果 result {**time_info, **task_info} return result def _parse_relative_time(self, time_str: str) - datetime: 解析相对时间字符串为具体时间 now datetime.now(pytz.utc) # 处理常见时间格式 # 例如: 7 AM, 3 PM, 14:15 time_match re.search(r(\d{1,2})(?::(\d{2}))?\s*(AM|PM|am|pm)?, time_str) if time_match: hour int(time_match.group(1)) minute int(time_match.group(2)) if time_match.group(2) else 0 period time_match.group(3) # 处理AM/PM if period and period.upper() PM and hour ! 12: hour 12 elif period and period.upper() AM and hour 12: hour 0 # 创建目标时间今天 target_time now.replace(hourhour, minuteminute, second0, microsecond0) # 如果目标时间已过则设为明天 if target_time now: target_time timedelta(days1) return target_time # 默认返回当前时间1小时 return now timedelta(hours1)2. Search Router - 搜索路由器Search Router作为智能交通控制器根据查询分析将请求路由到最合适的专业搜索代理。SearchRouter类import urllib.request import xml.etree.ElementTree as ET import requests from urllib.parse import urlencode from langchain_community.utilities import SerpAPIWrapper from typing import Dict, Any, List from datetime import datetime, timedelta import pytz class SearchRouter: def __init__(self): # 初始化专业搜索代理 self.paper_search_agent PaperSearchAgent() self.news_search_agent NewsSearchAgent() self.general_search_agent GeneralSearchAgent() def route_and_search(self, query_analysis: dict) - dict: 根据查询分析将搜索请求路由到合适的搜索代理 Args: query_analysis (dict): QueryAnalysisAgent的查询分析结果 Returns: dict: 包含搜索结果的字典包括成功/失败状态和相关信息 try: # 检查搜索类型 search_type query_analysis.get(search_type) # 记录开始时间用于日志记录 start_time datetime.now(pytz.utc) # 执行搜索 if search_type research_paper: print(Performing research paper search...) result self.paper_search_agent.perform_search(query_analysis) elif search_type news: print(Performing news search...) result self.news_search_agent.perform_search(query_analysis) elif search_type general: print(Performing general search...) result self.general_search_agent.perform_search(query_analysis) else: return { status: error, error_message: fUnsupported search type: {search_type}, original_query: query_analysis.get(original_query), } # 计算搜索持续时间 end_time datetime.now(pytz.utc) search_duration (end_time - start_time).total_seconds() # 添加元数据到结果 result.update({ search_type: search_type, search_duration: search_duration, search_timestamp: end_time.isoformat(), original_query: query_analysis.get(original_query), }) return result except Exception as e: return { status: error, error_message: str(e), original_query: query_analysis.get(original_query), search_type: query_analysis.get(search_type), }3. 专业搜索代理系统包含三种专业搜索代理PaperSearchAgent、NewsSearchAgent和GeneralSearchAgent。3.1 PaperSearchAgent - 学术论文搜索代理PaperSearchAgent类class PaperSearchAgent: def __init__(self): self.base_url http://export.arxiv.org/api/query def perform_search(self, query_info: Dict[str, Any]) - Dict[str, Any]: try: keywords self._process_keywords(query_info[keywords]) max_results self._extract_max_results(query_info.get(requirements, )) url f{self.base_url}?search_queryall:{keywords}start0max_results{max_results} response urllib.request.urlopen(url) data response.read().decode(utf-8) results self._parse_arxiv_results(data) return { status: success, results: results, total_found: len(results), returned_count: len(results), query_info: query_info, } except Exception as e: return { status: error, error_message: str(e), query_info: query_info, } def _process_keywords(self, keywords: List[str]) - str: # 移除时间相关关键词 filtered_keywords [ k for k in keywords if not any( time in k.lower() for time in [hour, morning, afternoon, evening] ) ] return .join(filtered_keywords) def _extract_max_results(self, requirements: str) - int: import re # 提取数字 numbers re.findall(r\d, requirements) return int(numbers[0]) if numbers else 5 def _parse_arxiv_results(self, data: str) - List[Dict[str, Any]]: root ET.fromstring(data) results [] for entry in root.findall({http://www.w3.org/2005/Atom}entry): title entry.find({http://www.w3.org/2005/Atom}title).text url entry.find({http://www.w3.org/2005/Atom}id).text published entry.find({http://www.w3.org/2005/Atom}published).text summary entry.find({http://www.w3.org/2005/Atom}summary).text results.append({ type: research_paper, title: title, url: url, published_date: published[:10], summary: summary, source: arxiv, }) return results3.2 NewsSearchAgent - 新闻搜索代理NewsSearchAgent类class NewsSearchAgent: def __init__(self, api_key: str None): 使用NewsAPI初始化新闻搜索代理。 NewsAPI遵循REST API结构基础URL为https://newsapi.org/v2。 它提供两个主要端点 - /everything: 搜索整个新闻档案。 - /top-headlines: 获取最新的头条新闻。 self.api_key os.environ[NEWS_API_KEY] if not self.api_key: raise ValueError(NewsAPI key is required) self.base_url https://newsapi.org/v2 def perform_search( self, query_info: Dict[str, Any], max_results: int 5 ) - Dict[str, Any]: 根据给定的查询信息执行新闻搜索。 Args: query_info (Dict[str, Any]): 包含搜索参数的字典。 max_results (int): 返回的最大结果数。 Returns: Dict[str, Any]: 包含搜索结果或错误消息的字典。 try: # 提取实际搜索词排除时间相关关键词 search_terms query_info.get( search_terms, query_info.get(keywords, []) ) # 检查是否为实时新闻搜索 is_realtime query_info.get(time_sensitivity) urgent from_date datetime.now() - timedelta(hours1 if is_realtime else 24) # 配置everything端点的参数 params { q: .join(search_terms), # 排除时间相关关键词 from: from_date.strftime(%Y-%m-%d), sortBy: publishedAt, language: en, apiKey: self.api_key, } # 构建API请求URL url f{self.base_url}/everything?{urlencode(params)} # 发送API请求 response requests.get(url) data response.json() # 检查响应状态 if response.status_code ! 200: return { status: error, error_message: data.get(message, Unknown error), query_info: query_info, } # 处理和格式化结果 articles data.get(articles, []) formatted_results [] for article in articles[:max_results]: formatted_results.append({ title: article.get(title), description: article.get(description), url: article.get(url), published_at: article.get(publishedAt), source: article.get(source, {}).get(name), content: article.get(content), }) return { status: success, results: formatted_results, total_results: data.get(totalResults, 0), returned_count: len(formatted_results), search_parameters: { keywords: query_info[keywords], from_date: from_date.strftime(%Y-%m-%d), language: en, }, } except Exception as e: return { status: error, error_message: str(e), query_info: query_info, } def get_top_headlines( self, country: str us, category: str None ) - Dict[str, Any]: 获取头条新闻。 Args: country (str): 国家代码默认us代表美国。 category (str, optional): 新闻类别例如business, technology。 Returns: Dict[str, Any]: 包含头条新闻的字典。 params {country: country, apiKey: self.api_key} if category: params[category] category url f{self.base_url}/top-headlines?{urlencode(params)} response requests.get(url) return response.json()3.3 GeneralSearchAgent - 一般搜索代理GeneralSearchAgent类class GeneralSearchAgent: def __init__(self, serpapi_key: str None): if serpapi_key: os.environ[SERPAPI_API_KEY] serpapi_key self.search SerpAPIWrapper() def setup_search_parameters(self, query_info: Dict[str, Any]) - List[str]: 为一般搜索构建搜索查询。 keywords .join(query_info[keywords]) # 设置基础搜索查询 search_queries [ f{keywords} lang:ko, # 韩语结果 keywords, # 一般搜索 ] return search_queries def perform_search( self, query_info: Dict[str, Any], max_results: int 5 ) - Dict[str, Any]: 执行一般搜索并返回结果。 try: search_queries self.setup_search_parameters(query_info) all_results [] for query in search_queries: raw_results self.search.run(query) parsed_results self._parse_general_results(raw_results) all_results.extend(parsed_results) # 按相关性分数排序 sorted_results sorted( all_results, keylambda x: x.get(relevance_score, 0), reverseTrue )[:max_results] return { status: success, results: sorted_results, total_found: len(all_results), returned_count: len(sorted_results), query_info: query_info, } except Exception as e: return { status: error, error_message: str(e), query_info: query_info, } def _parse_general_results(self, raw_results: str) - List[Dict[str, Any]]: 解析一般搜索结果。 parsed_results [] for result in raw_results.split(\n): if not result.strip(): continue parsed_results.append({ type: general, title: self._extract_title(result), content: result, url: self._extract_url(result), relevance_score: self._calculate_relevance(result), }) return parsed_results def _extract_title(self, result: str) - str: 从结果中提取标题。 return result.split(.)[0].strip()[:100] def _extract_url(self, result: str) - str: 从结果中提取URL。 import re urls re.findall(rhttps?://(?:[-\w.]|(?:%[\da-fA-F]{2}))[^\s]*, result) return urls[0] if urls else def _calculate_relevance(self, result: str) - float: 计算搜索结果的相关性分数。 relevance_score 0.5 # 基础分数 # 基于关键词匹配计算分数 keywords [official, guide, tutorial, review, recommendation] lower_result result.lower() for keyword in keywords: if keyword in lower_result: relevance_score 0.1 return min(relevance_score, 1.0) # 确保分数不超过1.04. Response Agent - 响应代理Response Agent作为系统的专家通信者将原始搜索结果转换为结构良好、可读的内容满足用户需求。ResponseAgent类from langchain_openai import ChatOpenAI from langchain.prompts import PromptTemplate import json class ResponseAgent: def __init__(self): self.llm ChatOpenAI(modelgpt-4o) self.setup_prompts() def setup_prompts(self): # 研究论文搜索提示 self.paper_prompt PromptTemplate.from_template( Please organize the following research paper search results in email format. Search term: {query} Search results: {results} Format as follows: 1. Email subject: Research Paper Search Results for [search term] 2. Body: - Greeting - Here are the organized research paper results for your search. - Number each paper and format as follows: 1. Paper title: [title] - Summary: [core research content and key findings] - URL: [link] - Closing remarks ) # 新闻搜索提示 self.news_prompt PromptTemplate.from_template( Please organize the following news search results in email format. Search term: {query} Search results: {results} Format as follows: 1. Email subject: Latest News Updates for [search term] 2. Body: - Greeting - Here are the latest news articles related to your search topic. - Number each news item and format as follows: 1. [title] - [news source] - Main content: [key content summary] - Published date: [date] - URL: [link] - Closing remarks ) # 一般搜索提示 self.general_prompt PromptTemplate.from_template( Please organize the following search results in email format. Search term: {query} Search results: {results} Format as follows: 1. Email subject: Search Results for [search term] 2. Body: - Greeting - Here are the organized results for your search. - Number each result and format as follows: 1. [title] - Content: [main content summary] - Source: [website or platform name] - URL: [link] - Closing remarks ) def format_results(self, search_results): try: # 处理无结果或错误的情况 if search_results.get(status) error: return { subject: Search Error Notification, body: fAn error occurred during search: {search_results.get(error_message, Unknown error)}, } # 处理无结果的情况 if not search_results.get(results): return { subject: No Search Results, body: fNo results found for search term {search_results.get(original_query, )}., } # 根据搜索类型选择提示 search_type search_results.get(search_type, general) if search_type research_paper: prompt self.paper_prompt elif search_type news: prompt self.news_prompt else: prompt self.general_prompt # 准备结果格式化的输入 formatted_input { query: search_results.get(original_query, ), results: json.dumps( search_results.get(results, []), ensure_asciiFalse, indent2 ), } # 通过LLM生成响应 response prompt.format(**formatted_input) response self.llm.invoke(response) try: # 尝试JSON解析 return json.loads(response.content) except json.JSONDecodeError: # 如果JSON解析失败返回默认格式 return { subject: fSearch Results for [{formatted_input[query]}], body: response.content, } except Exception as e: return { subject: Result Processing Error, body: fAn error occurred while processing results: {str(e)}\n\nOriginal query: {search_results.get(original_query, )}, }5. Scheduled Search System - 调度搜索系统ScheduledSearchSystem管理搜索任务的完整生命周期从调度到结果交付。ScheduledSearchSystem类import schedule import time import yagmail import chromadb from chromadb.config import Settings import threading import time from queue import Queue class ScheduledSearchSystem: def __init__(self, email_config: Dict[str, Any]): print(f\n[{self._get_current_time()}] Initializing system...) self.query_analyzer QueryAnalysisAgent() self.search_router SearchRouter() self.response_agent ResponseAgent() self.client chromadb.PersistentClient(path./search_data) # 邮件配置 self.email_config email_config self.yag yagmail.SMTP(email_config[username], email_config[password]) print(f[{self._get_current_time()}] Email client configuration complete) self.scheduled_tasks {} self.setup_collections() # 添加完成标志 self.is_completed False self.completion_event threading.Event() # 启动调度器 self.scheduler_thread threading.Thread(targetself._run_scheduler) self.scheduler_thread.daemon True self.scheduler_thread.start() print(f[{self._get_current_time()}] System initialization complete\n) def _get_current_time(self): 返回当前时间字符串 return datetime.now().strftime(%Y-%m-%d %H:%M:%S) def setup_collections(self): 设置ChromaDB集合 print(f[{self._get_current_time()}] Starting ChromaDB collection setup...) self.results_collection self.client.get_or_create_collection( namesearch_results, metadata{description: Raw search results} ) self.formatted_collection self.client.get_or_create_collection( nameformatted_responses, metadata{description: Formatted responses for email delivery}, ) self.schedule_collection self.client.get_or_create_collection( namescheduled_tasks, metadata{description: Scheduled search and email tasks}, ) print(f[{self._get_current_time()}] ChromaDB collection setup complete) def schedule_task(self, query: str, user_email: str) - Dict[str, Any]: 调度任务 try: print(f\n[{self._get_current_time()}] Starting new task scheduling...) print(fQuery: {query}) print(fEmail: {user_email}) # 查询分析 query_analysis self.query_analyzer.analyze_query(query) execution_time query_analysis[execution_time] target_time query_analysis[target_time] print(fScheduled search execution time: {execution_time}) print(fScheduled email delivery time: {target_time}) # 生成任务ID schedule_id ftask_{datetime.now(pytz.UTC).timestamp()} print(fGenerated task ID: {schedule_id}) # 保存任务信息 task_info { query: query, email: user_email, execution_time: execution_time.isoformat(), target_time: target_time.isoformat(), search_type: query_analysis[search_type], status: scheduled, } self.scheduled_tasks[schedule_id] task_info # 保存到ChromaDB print( f[{self._get_current_time()}] Saving task information to ChromaDB... ) self.schedule_collection.add( documents[json.dumps(task_info)], metadatas[{type: schedule, status: pending}], ids[schedule_id], ) print(f[{self._get_current_time()}] Task information saved) # 调度搜索执行 execution_time_str execution_time.strftime(%H:%M) schedule.every().day.at(execution_time_str).do( self.execute_search, schedule_idschedule_id ).tag(schedule_id) print(f[{self._get_current_time()}] Search task scheduling complete) return { status: success, message: Task successfully scheduled, schedule_id: schedule_id, execution_time: execution_time, target_time: target_time, } except Exception as e: print(f[{self._get_current_time()}] Task scheduling failed: {str(e)}) return {status: error, error_message: str(e)} def execute_search(self, schedule_id: str) - bool: 执行搜索 try: print( f\n[{self._get_current_time()}] Starting search execution (ID: {schedule_id}) ) task_info self.scheduled_tasks.get(schedule_id) if not task_info: print(f[{self._get_current_time()}] Task information not found) return False print(f[{self._get_current_time()}] Analyzing search query...) query_analysis self.query_analyzer.analyze_query(task_info[query]) print(f[{self._get_current_time()}] Performing search...) search_results self.search_router.route_and_search(query_analysis) print(f[{self._get_current_time()}] Formatting search results...) formatted_response self.response_agent.format_results(search_results) # 保存结果 print(f[{self._get_current_time()}] Saving search results to ChromaDB...) response_id fresponse_{schedule_id} self.formatted_collection.add( documents[json.dumps(formatted_response)], metadatas[ { schedule_id: schedule_id, email: task_info[email], target_time: task_info[target_time], } ], ids[response_id], ) print(f[{self._get_current_time()}] Search results saved) # 调度邮件发送 target_time datetime.fromisoformat(task_info[target_time]) target_time_str target_time.strftime(%H:%M) schedule.every().day.at(target_time_str).do( self.send_email, schedule_idschedule_id ).tag(femail_{schedule_id}) print( f[{self._get_current_time()}] Email delivery scheduled (Time: {target_time_str}) ) return True except Exception as e: print(f[{self._get_current_time()}] Search execution failed: {str(e)}) return False def send_email(self, schedule_id: str) - bool: 发送邮件 try: print( f\n[{self._get_current_time()}] Starting email delivery (ID: {schedule_id}) ) response_id fresponse_{schedule_id} print(f[{self._get_current_time()}] Retrieving saved search results...) response_results self.formatted_collection.get(ids[response_id]) if not response_results[documents]: print(f[{self._get_current_time()}] Search results not found) return False formatted_response json.loads(response_results[documents][0]) metadata response_results[metadatas][0] print(f[{self._get_current_time()}] Sending email...) print(fRecipient: {metadata[email]}) print(fSubject: {formatted_response[subject]}) self.yag.send( tometadata[email], subjectformatted_response[subject], contentsformatted_response[body], ) print(f[{self._get_current_time()}] Email sent successfully) # 更新任务状态 print(f[{self._get_current_time()}] Updating task status...) task_info self.scheduled_tasks[schedule_id] task_info[status] completed self.schedule_collection.update( documents[json.dumps(task_info)], ids[schedule_id] ) # 清除调度 schedule.clear(femail_{schedule_id}) print(f[{self._get_current_time()}] Task completion processing complete\n) # 设置完成标志 self.is_completed True self.completion_event.set() print( f[{self._get_current_time()}] All tasks completed. Shutting down system.\n ) return True except Exception as e: print(f[{self._get_current_time()}] Email delivery failed: {str(e)}) return False def _run_scheduler(self): 运行调度器 print(f[{self._get_current_time()}] Scheduler started...) while not self.is_completed: schedule.run_pending() time.sleep(1) # 每秒检查一次 def wait_for_completion(self, timeoutNone): 等待任务完成 try: completed self.completion_event.wait(timeouttimeout) if not completed: print(f[{self._get_current_time()}] Task completion timeout) if hasattr(self, yag): self.yag.close() except KeyboardInterrupt: print(f\n[{self._get_current_time()}] Terminated by user.) if hasattr(self, yag): self.yag.close()6. 系统使用示例系统使用示例# 系统配置 email_config { username: your_emailgmail.com, password: your_app_password, } print(\n Starting Scheduling System Test \n) # 初始化系统 system ScheduledSearchSystem(email_config) # 调度任务 result system.schedule_task( queryfind Modular RAG paper at 14:15 PM, # 示例下午2:15 user_emailrecipientgmail.com, ) print(\n Scheduling Result ) print(json.dumps(result, indent2, ensure_asciiFalse)) print(\n Task will execute at scheduled time... \n) # 等待任务完成最多4小时 system.wait_for_completion(timeout14400)7. 测试函数测试函数def datetime_handler(obj): 处理程序将datetime对象转换为JSON可序列化的字符串。 Args: obj: 要转换的对象。 Returns: str: 格式化的datetime字符串格式YYYY-MM-DD HH:MM:SSZZZZ。 Raises: TypeError: 如果对象不是datetime实例则引发。 if isinstance(obj, datetime): return obj.strftime(%Y-%m-%d %H:%M:%S%z) raise TypeError(fObject of type {type(obj)} is not JSON serializable) def test_search_system(): 测试搜索系统 # 初始化组件 query_analyzer QueryAnalysisAgent() search_router SearchRouter() response_agent ResponseAgent() # 测试查询 test_queries [ recommend place to eat at seoul in 7 pm, find rag persona paper at 3 pm, find news us president speech in 7 am, ] for query in test_queries: print(f\n{*60}) print(fTest Query: {query}) print(f{*60}) try: query_analysis query_analyzer.analyze_query(query) print(\n1. Query Analysis Results:) print( json.dumps( query_analysis, indent2, ensure_asciiFalse, defaultdatetime_handler, ) ) if query_analysis[status] ! success: print(Query analysis failed!) continue search_results search_router.route_and_search(query_analysis) print(\n2. Search Results:) print( json.dumps( search_results, indent2, ensure_asciiFalse, defaultdatetime_handler, ) ) # 添加结果格式化 print(\n3. Formatted Results:) formatted_results response_agent.format_results(search_results) print(json.dumps(formatted_results, indent2, ensure_asciiFalse)) except Exception as e: print(fError occurred during test: {str(e)}) if __name__ __main__: test_search_system()案例效果多智能体调度系统实现了以下效果自然语言理解系统能够解析包含时间信息的自然语言查询如find rag persona paper at 3 pm智能搜索路由根据查询内容自动选择最合适的搜索类型学术论文、新闻或一般搜索定时任务执行在指定时间自动执行搜索任务比目标时间提前5分钟开始执行结果格式化将原始搜索结果转换为结构化、易读的邮件格式自动邮件发送在目标时间自动发送格式化后的搜索结果到指定邮箱任务状态跟踪使用ChromaDB持久化存储任务信息和搜索结果示例输出查询分析结果{ target_time: 2025-02-08 15:00:000000, execution_time: 2025-02-08 14:55:000000, task_type: search, search_type: research_paper, keywords: [ rag, persona, paper, 3, pm ], requirements: minimum 5 results, time_sensitivity: normal, original_query: find rag persona paper at 3 pm, status: success }格式化邮件内容{ subject: Search Results for [find rag persona paper at 3 pm], body: Subject: Search Results for \find rag persona paper at 3 pm\\n\n---\n\nHello,\n\nHere are the organized research paper results for your search:\n\n1. Paper title: PeaCoK: Persona Commonsense Knowledge for Consistent and Engaging Narratives\n - Summary: This paper introduces PeaCoK, a method for incorporating persona-based commonsense knowledge into narrative generation systems...\n - URL: http://arxiv.org/abs/2305.02364v2\n\n2. Paper title: RAG vs. Fine-tuning: Pipelines, Tradeoffs, and a Case Study on Agriculture\n - Summary: This paper presents a comprehensive comparison between RAG and fine-tuning approaches...\n - URL: http://arxiv.org/abs/2401.15868\n\n...\n\nThank you for using our service. Should you have any further questions or need additional information, feel free to reach out.\n\nBest regards,\n\n[Your Name] }案例实现思路多智能体调度系统的实现基于以下思路1. 模块化设计系统采用模块化设计将功能分解为独立的智能体组件QueryAnalysisAgent负责自然语言理解和查询分析SearchRouter负责根据查询类型路由到合适的搜索代理专业搜索代理负责执行特定类型的搜索任务ResponseAgent负责结果格式化和内容生成ScheduledSearchSystem负责任务调度和系统协调2. 智能体协作各智能体通过标准化的数据格式和接口进行协作使用JSON格式传递查询分析结果统一的搜索结果格式便于后续处理标准化的邮件格式确保一致的用户体验3. 时间管理系统实现精确的时间管理机制从自然语言查询中提取时间信息设置执行时间比目标时间提前5分钟使用Schedule库实现定时任务调度支持相对时间如7 AM和绝对时间4. 数据持久化使用ChromaDB实现数据持久化存储任务信息和状态保存搜索结果和格式化响应支持任务状态跟踪和历史查询5. 异步处理系统采用异步处理机制使用线程实现非阻塞操作调度器在后台持续运行支持多个并发任务调度6. 错误处理实现全面的错误处理机制每个组件都有异常捕获和处理提供有意义的错误信息确保系统稳定性和可靠性扩展建议基于当前实现可以考虑以下扩展方向1. 增强搜索能力添加更多专业搜索代理如专利搜索、代码搜索等实现搜索结果去重和相关性排序优化支持多语言搜索和结果翻译添加搜索结果缓存机制提高响应速度2. 提升智能体能力增强QueryAnalysisAgent的时间解析能力支持更复杂的时间表达式改进ResponseAgent支持更多输出格式如PDF、Word等添加搜索结果摘要和关键信息提取功能实现基于用户偏好的个性化结果排序3. 扩展调度功能支持周期性任务调度如每天、每周等添加任务优先级管理和资源分配实现任务依赖关系和复杂工作流添加任务执行监控和性能统计4. 增强通知机制支持多种通知方式如短信、即时消息等添加通知模板自定义功能实现通知优先级和分组管理添加通知历史记录和统计5. 用户界面与交互开发Web界面提供图形化任务管理添加任务进度实时显示功能实现用户认证和权限管理提供API接口支持第三方集成6. 系统优化实现分布式架构提高系统可扩展性添加负载均衡和故障转移机制优化资源使用提高系统效率添加全面的日志记录和监控功能总结多智能体调度系统是一个综合性的AI应用案例展示了如何将多个智能体组件协同工作实现从自然语言查询到定时信息检索与邮件发送的完整自动化流程。该案例的核心价值在于智能体协作通过多个专业智能体的协作实现复杂任务的自动化处理自然语言理解利用LLM的能力解析自然语言查询提取关键信息任务调度实现精确的时间管理和任务调度满足用户定时需求结果处理将原始搜索结果转换为用户友好的格式并通过邮件发送该案例可以应用于多种场景如研究人员定时获取最新论文商务人士定期获取行业新闻个人用户定时获取特定主题信息企业自动化信息收集与分发通过进一步扩展和优化该系统可以成为一个强大的信息管理和自动化工具为用户提供个性化的信息服务。