Python+自动化工具实现数据采集与多维表格同步

一、业务场景与技术选型

在供应链管理场景中,实时获取全国主要城市的天气数据对物流调度具有重要参考价值。某企业提出需求:需要每日自动采集43个重点城市的天气信息,并将结构化数据同步至在线协作表格,供运营团队分析使用。

针对该需求,我们采用”Python爬虫+多维表格API”的技术方案。相比传统手动采集方式,自动化方案具有三大优势:

  1. 数据时效性:每日定时更新,确保数据新鲜度
  2. 操作一致性:消除人工录入误差,保证数据准确性
  3. 维护便捷性:模块化设计便于功能扩展

二、技术栈准备与环境配置

2.1 核心依赖库

  1. # 基础依赖安装命令
  2. pip install requests pandas openpyxl schedule
  • requests:HTTP请求库,用于调用天气API
  • pandas:数据处理框架,支持数据清洗与转换
  • openpyxl:Excel操作库,处理本地数据缓存
  • schedule:定时任务库,实现每日自动执行

2.2 开发环境建议

推荐使用Python 3.8+环境,配合虚拟环境管理工具(如venv)隔离项目依赖。对于大规模数据采集,建议配置代理IP池防止API限流,并添加异常处理机制增强系统健壮性。

三、数据采集模块实现

3.1 API接口设计

采用某天气数据平台的RESTful接口,其标准格式为:

  1. GET /api/weather/city/{city_code}

响应数据包含温度、湿度、风力等12项关键指标,采用JSON格式返回。需注意不同城市的编码规则,建议建立城市代码映射表。

3.2 爬虫核心代码

  1. import requests
  2. import pandas as pd
  3. from datetime import datetime
  4. class WeatherCrawler:
  5. def __init__(self):
  6. self.base_url = "http://api.weather.example.com/city/"
  7. self.headers = {
  8. "User-Agent": "Mozilla/5.0",
  9. "Accept": "application/json"
  10. }
  11. self.city_codes = {
  12. "北京": "101010100",
  13. "上海": "101020100",
  14. # 其他城市代码...
  15. }
  16. def fetch_data(self, city_code):
  17. try:
  18. response = requests.get(
  19. f"{self.base_url}{city_code}",
  20. headers=self.headers,
  21. timeout=10
  22. )
  23. response.raise_for_status()
  24. return response.json()
  25. except requests.exceptions.RequestException as e:
  26. print(f"采集{city_code}数据失败: {str(e)}")
  27. return None
  28. def collect_all(self):
  29. records = []
  30. for city, code in self.city_codes.items():
  31. data = self.fetch_data(code)
  32. if data:
  33. record = {
  34. "城市": city,
  35. "温度": data["data"]["wendu"],
  36. "湿度": data["data"]["shidu"],
  37. "更新时间": datetime.now().strftime("%Y-%m-%d %H:%M")
  38. }
  39. records.append(record)
  40. return pd.DataFrame(records)

四、数据处理与格式转换

4.1 数据清洗流程

  1. 异常值处理:过滤温度超过50℃或低于-40℃的记录
  2. 缺失值填充:对湿度等可选字段使用中位数填充
  3. 单位统一:将风力等级转换为标准单位(m/s)
  1. def clean_data(df):
  2. # 温度异常值过滤
  3. df = df[(df["温度"] >= -40) & (df["温度"] <= 50)]
  4. # 湿度缺失值填充
  5. if "湿度" in df.columns:
  6. median_humidity = df["湿度"].median()
  7. df["湿度"] = df["湿度"].fillna(median_humidity)
  8. return df

4.2 多维表格适配

不同协作表格平台的数据结构要求各异,需重点关注:

  • 字段类型映射:数值型、文本型、日期型的正确转换
  • 数据量限制:单次提交的记录数上限
  • 更新策略:全量覆盖还是增量更新

五、自动化同步实现

5.1 定时任务配置

使用schedule库实现每日9点自动执行:

  1. import schedule
  2. import time
  3. def job():
  4. crawler = WeatherCrawler()
  5. raw_data = crawler.collect_all()
  6. cleaned_data = clean_data(raw_data)
  7. # 调用表格同步函数
  8. sync_to_table(cleaned_data)
  9. print(f"{datetime.now()} 数据同步完成")
  10. schedule.every().day.at("09:00").do(job)
  11. while True:
  12. schedule.run_pending()
  13. time.sleep(60)

5.2 同步接口封装

以某主流协作平台的REST API为例,实现数据写入:

  1. def sync_to_table(df):
  2. api_url = "https://api.table.example.com/v1/records"
  3. auth_token = "your_auth_token"
  4. # 转换数据格式
  5. records = []
  6. for _, row in df.iterrows():
  7. record = {
  8. "fields": {
  9. "城市": row["城市"],
  10. "温度": float(row["温度"]),
  11. "更新时间": row["更新时间"]
  12. }
  13. }
  14. records.append(record)
  15. # 分批提交(假设每批50条)
  16. for i in range(0, len(records), 50):
  17. batch = records[i:i+50]
  18. response = requests.post(
  19. api_url,
  20. headers={
  21. "Authorization": f"Bearer {auth_token}",
  22. "Content-Type": "application/json"
  23. },
  24. json={"records": batch}
  25. )
  26. if response.status_code != 200:
  27. print(f"同步失败: {response.text}")

六、部署与运维方案

6.1 服务器部署建议

  • 轻量级应用:使用云函数(Serverless)按需执行
  • 持续运行需求:部署在云服务器或容器平台
  • 资源要求:单核CPU+1GB内存即可满足需求

6.2 监控告警机制

  1. 日志系统:记录每次执行状态和错误信息
  2. 邮件告警:当连续失败超过3次时触发通知
  3. 数据校验:同步后自动检查记录数是否匹配

七、扩展性设计

7.1 插件化架构

将核心功能拆分为独立模块:

  1. weather_crawler/
  2. ├── config/ # 配置文件
  3. ├── crawlers/ # 数据采集器
  4. ├── processors/ # 数据处理器
  5. ├── syncers/ # 同步适配器
  6. └── scheduler.py # 任务调度

7.2 多数据源支持

通过抽象基类实现不同数据源的统一接入:

  1. from abc import ABC, abstractmethod
  2. class DataSource(ABC):
  3. @abstractmethod
  4. def fetch(self):
  5. pass
  6. class WeatherAPI(DataSource):
  7. def fetch(self):
  8. # 具体天气API实现
  9. pass
  10. class StockAPI(DataSource):
  11. def fetch(self):
  12. # 股票数据API实现
  13. pass

八、常见问题解决方案

  1. API限流问题

    • 配置随机请求间隔(1-3秒)
    • 建立代理IP池轮询使用
    • 实现指数退避重试机制
  2. 数据格式冲突

    • 使用中间格式(如JSON Schema)验证数据
    • 开发数据转换管道进行动态适配
  3. 网络不稳定处理

    • 添加重试计数器
    • 实现断点续传功能
    • 配置本地缓存作为后备

通过该技术方案,企业可实现天气数据的全自动采集与同步,运营团队无需手动干预即可获取最新数据。实际测试显示,43个城市的数据采集与同步可在3分钟内完成,准确率达到99.7%。该架构具有良好的扩展性,可快速适配其他数据源和协作平台,为企业的数据自动化建设提供可靠实践参考。