一、技术架构设计
1.1 整体技术栈
本方案采用分层架构设计,底层基于Hadoop+Hive构建分布式数据仓库,中间层通过MapReduce完成数据清洗,上层使用Python Flask框架搭建Web服务,最终通过ECharts实现可视化展示。数据流路径为:实时采集→HDFS存储→Hive元数据管理→清洗转换→关系型数据库→前端渲染。
1.2 组件版本说明
- 分布式计算:Hadoop 3.2.0(含HDFS/YARN)
- 数据仓库:Hive 3.1.2(支持事务性操作)
- 编程语言:Python 3.9(配合虚拟环境管理)
- 可视化库:ECharts 2.0(兼容主流浏览器)
- Web框架:Flask 3.0(RESTful API设计)
二、环境准备与配置
2.1 Python依赖管理
建议使用虚拟环境隔离项目依赖,通过国内镜像加速安装:
# 创建虚拟环境python -m venv venvsource venv/bin/activate# 安装核心依赖包pip install -i https://mirrors.example.com/simple/ \pandas==2.0.3 \flask==3.0.0 \flask-cors==4.0.1 \pymysql==1.1.0 \pyecharts==2.0.4
2.2 数据库服务配置
MySQL初始化
# 启动服务并验证状态systemctl start mysqldsystemctl status mysqld | grep Active# 安全配置(生产环境需修改默认密码)mysql_secure_installation
Hive服务启动
需在两个终端分别启动元数据服务和HiveServer2:
# 终端1:启动元数据服务(等待20秒初始化)/opt/hive/bin/hive --service metastore &# 终端2:启动交互服务/opt/hive/bin/hive --service hiveserver2 &# 验证服务可用性beeline -u "jdbc:hive2://localhost:10000" -n root
2.3 Hadoop集群准备
# 解除安全模式(生产环境谨慎操作)hdfs dfsadmin -safemode leave# 启动完整集群服务/opt/hadoop/sbin/start-dfs.sh/opt/hadoop/sbin/start-yarn.sh# 验证NameNode状态hdfs dfsadmin -report | grep Live
三、数据采集与存储
3.1 Flume配置管理
创建source_dir_sink_hdfs.conf配置文件,实现本地目录到HDFS的实时同步:
# 定义agent组件a1.sources = r1a1.sinks = k1a1.channels = c1# 配置源目录监控a1.sources.r1.type = spooldira1.sources.r1.spoolDir = /data/stock_sourcea1.sources.r1.fileHeader = true# HDFS存储配置a1.sinks.k1.type = hdfsa1.sinks.k1.hdfs.path = hdfs://namenode:8020/data/input/%Y%m%da1.sinks.k1.hdfs.filePrefix = stock_a1.sinks.k1.hdfs.round = true
启动监控服务:
/opt/flume/bin/flume-ng agent \-c /opt/flume/conf \-f /data/jobs/source_dir_sink_hdfs.conf \-n a1 \-Dflume.root.logger=INFO,console
3.2 Hive表结构设计
创建外部表映射HDFS原始数据:
CREATE EXTERNAL TABLE ods_stock_raw (stock_code STRING,trade_date DATE,open_price DECIMAL(10,2),close_price DECIMAL(10,2),volume BIGINT)PARTITIONED BY (dt STRING)ROW FORMAT DELIMITEDFIELDS TERMINATED BY ','LOCATION '/data/input';
四、数据清洗与转换
4.1 MapReduce作业开发
使用Maven构建数据清洗项目,核心逻辑示例:
public class DataCleaner extends Mapper<LongWritable, Text, Text, Text> {@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String[] fields = value.toString().split(",");// 数据质量校验if (fields.length < 5 || fields[2].isEmpty()) {context.getCounter("Data Quality", "Invalid Records").increment(1);return;}// 标准化处理String cleanedValue = String.join(",",fields[0].trim(),fields[1].trim(),formatDecimal(fields[2]),formatDecimal(fields[3]),fields[4].trim());context.write(new Text(fields[0]), new Text(cleanedValue));}private String formatDecimal(String input) {try {return new BigDecimal(input).setScale(2, RoundingMode.HALF_UP).toString();} catch (NumberFormatException e) {return "0.00";}}}
4.2 清洗作业执行
# 打包项目(跳过测试)mvn clean package -Dmaven.test.skip=true# 提交清洗作业hadoop jar target/data-cleaner.jar \org.example.mr.DataCleaner \/data/input \/data/stock/ods_stock_market_daily# 验证清洗结果hdfs dfs -cat /data/stock/ods_stock_market_daily/part-m-00000 | head
五、可视化大屏实现
5.1 Flask后端服务
from flask import Flask, jsonifyimport pymysqlapp = Flask(__name__)@app.route('/api/stock/trend/<stock_code>')def get_stock_trend(stock_code):conn = pymysql.connect(host='localhost',user='root',password='secure_password',database='stock_db')cursor = conn.cursor()cursor.execute("""SELECT trade_date, close_priceFROM dwd_stock_dailyWHERE stock_code = %sORDER BY trade_date DESCLIMIT 30""", (stock_code,))data = [{"date": row[0].strftime('%Y-%m-%d'), "value": float(row[1])}for row in cursor.fetchall()]return jsonify({"code": 0, "data": data})if __name__ == '__main__':app.run(host='0.0.0.0', port=5000, debug=True)
5.2 ECharts前端集成
// 初始化K线图const chart = echarts.init(document.getElementById('main-chart'));// 异步获取数据fetch('/api/stock/trend/600519').then(res => res.json()).then(data => {const option = {title: { text: '贵州茅台股价趋势' },tooltip: { trigger: 'axis' },xAxis: { type: 'category', data: data.data.map(d => d.date) },yAxis: { type: 'value' },series: [{data: data.data.map(d => d.value),type: 'line',smooth: true,areaStyle: {}}]};chart.setOption(option);});
六、性能优化建议
- 数据分区策略:在Hive表中按日期和股票代码进行二级分区
- 清洗作业调优:设置
mapreduce.task.timeout=1800000避免长任务超时 - 缓存机制:对频繁访问的API结果实施Redis缓存
- 可视化优化:采用Web Worker处理大数据集渲染
- 监控告警:集成Prometheus监控Hive查询性能
本方案通过标准化数据管道和模块化设计,实现了股票数据从采集到展示的全链路自动化。实际部署时建议增加数据质量监控模块和异常交易检测算法,进一步提升分析价值。