一、业务场景与技术选型
在供应链管理场景中,实时获取全国主要城市的天气数据对物流调度具有重要参考价值。某企业提出需求:需要每日自动采集43个重点城市的天气信息,并将结构化数据同步至在线协作表格,供运营团队分析使用。
针对该需求,我们采用”Python爬虫+多维表格API”的技术方案。相比传统手动采集方式,自动化方案具有三大优势:
- 数据时效性:每日定时更新,确保数据新鲜度
- 操作一致性:消除人工录入误差,保证数据准确性
- 维护便捷性:模块化设计便于功能扩展
二、技术栈准备与环境配置
2.1 核心依赖库
# 基础依赖安装命令pip install requests pandas openpyxl schedule
requests:HTTP请求库,用于调用天气APIpandas:数据处理框架,支持数据清洗与转换openpyxl:Excel操作库,处理本地数据缓存schedule:定时任务库,实现每日自动执行
2.2 开发环境建议
推荐使用Python 3.8+环境,配合虚拟环境管理工具(如venv)隔离项目依赖。对于大规模数据采集,建议配置代理IP池防止API限流,并添加异常处理机制增强系统健壮性。
三、数据采集模块实现
3.1 API接口设计
采用某天气数据平台的RESTful接口,其标准格式为:
GET /api/weather/city/{city_code}
响应数据包含温度、湿度、风力等12项关键指标,采用JSON格式返回。需注意不同城市的编码规则,建议建立城市代码映射表。
3.2 爬虫核心代码
import requestsimport pandas as pdfrom datetime import datetimeclass WeatherCrawler:def __init__(self):self.base_url = "http://api.weather.example.com/city/"self.headers = {"User-Agent": "Mozilla/5.0","Accept": "application/json"}self.city_codes = {"北京": "101010100","上海": "101020100",# 其他城市代码...}def fetch_data(self, city_code):try:response = requests.get(f"{self.base_url}{city_code}",headers=self.headers,timeout=10)response.raise_for_status()return response.json()except requests.exceptions.RequestException as e:print(f"采集{city_code}数据失败: {str(e)}")return Nonedef collect_all(self):records = []for city, code in self.city_codes.items():data = self.fetch_data(code)if data:record = {"城市": city,"温度": data["data"]["wendu"],"湿度": data["data"]["shidu"],"更新时间": datetime.now().strftime("%Y-%m-%d %H:%M")}records.append(record)return pd.DataFrame(records)
四、数据处理与格式转换
4.1 数据清洗流程
- 异常值处理:过滤温度超过50℃或低于-40℃的记录
- 缺失值填充:对湿度等可选字段使用中位数填充
- 单位统一:将风力等级转换为标准单位(m/s)
def clean_data(df):# 温度异常值过滤df = df[(df["温度"] >= -40) & (df["温度"] <= 50)]# 湿度缺失值填充if "湿度" in df.columns:median_humidity = df["湿度"].median()df["湿度"] = df["湿度"].fillna(median_humidity)return df
4.2 多维表格适配
不同协作表格平台的数据结构要求各异,需重点关注:
- 字段类型映射:数值型、文本型、日期型的正确转换
- 数据量限制:单次提交的记录数上限
- 更新策略:全量覆盖还是增量更新
五、自动化同步实现
5.1 定时任务配置
使用schedule库实现每日9点自动执行:
import scheduleimport timedef job():crawler = WeatherCrawler()raw_data = crawler.collect_all()cleaned_data = clean_data(raw_data)# 调用表格同步函数sync_to_table(cleaned_data)print(f"{datetime.now()} 数据同步完成")schedule.every().day.at("09:00").do(job)while True:schedule.run_pending()time.sleep(60)
5.2 同步接口封装
以某主流协作平台的REST API为例,实现数据写入:
def sync_to_table(df):api_url = "https://api.table.example.com/v1/records"auth_token = "your_auth_token"# 转换数据格式records = []for _, row in df.iterrows():record = {"fields": {"城市": row["城市"],"温度": float(row["温度"]),"更新时间": row["更新时间"]}}records.append(record)# 分批提交(假设每批50条)for i in range(0, len(records), 50):batch = records[i:i+50]response = requests.post(api_url,headers={"Authorization": f"Bearer {auth_token}","Content-Type": "application/json"},json={"records": batch})if response.status_code != 200:print(f"同步失败: {response.text}")
六、部署与运维方案
6.1 服务器部署建议
- 轻量级应用:使用云函数(Serverless)按需执行
- 持续运行需求:部署在云服务器或容器平台
- 资源要求:单核CPU+1GB内存即可满足需求
6.2 监控告警机制
- 日志系统:记录每次执行状态和错误信息
- 邮件告警:当连续失败超过3次时触发通知
- 数据校验:同步后自动检查记录数是否匹配
七、扩展性设计
7.1 插件化架构
将核心功能拆分为独立模块:
weather_crawler/├── config/ # 配置文件├── crawlers/ # 数据采集器├── processors/ # 数据处理器├── syncers/ # 同步适配器└── scheduler.py # 任务调度
7.2 多数据源支持
通过抽象基类实现不同数据源的统一接入:
from abc import ABC, abstractmethodclass DataSource(ABC):@abstractmethoddef fetch(self):passclass WeatherAPI(DataSource):def fetch(self):# 具体天气API实现passclass StockAPI(DataSource):def fetch(self):# 股票数据API实现pass
八、常见问题解决方案
-
API限流问题:
- 配置随机请求间隔(1-3秒)
- 建立代理IP池轮询使用
- 实现指数退避重试机制
-
数据格式冲突:
- 使用中间格式(如JSON Schema)验证数据
- 开发数据转换管道进行动态适配
-
网络不稳定处理:
- 添加重试计数器
- 实现断点续传功能
- 配置本地缓存作为后备
通过该技术方案,企业可实现天气数据的全自动采集与同步,运营团队无需手动干预即可获取最新数据。实际测试显示,43个城市的数据采集与同步可在3分钟内完成,准确率达到99.7%。该架构具有良好的扩展性,可快速适配其他数据源和协作平台,为企业的数据自动化建设提供可靠实践参考。