2025 RAICOM 睿抗机器人开发者大赛-大数据应用开发赛项决赛样题
项目背景与介绍
项目背景
在数字化电商时代,用户行为分析平台作为电商系统的核心组件,承担着用户画像分析、行为预测、个性化推荐、流失预警等重要功能 。本项目基于国产操作系统环境,采用现代化的大数据技术栈,构建了一个功能完整、性能优异的智能电商分析平台 。
项目介绍
本项目是一个基于大数据技术栈开发的电商用户行为分析平台,采用微服务架构设计 。系统具备以下核心功能 :
实时用户行为监控:基于 Kafka 的实时数据流处理、用户行为实时展示 。
数据分析引擎:基于 Spark 的离线数据处理、Hive 数据仓库分析 。
智能分析:基于机器学习的用户分群、流失预测、商品推荐 。
可视化展示:基于 ECharts 的多维度数据可视化、交互式图表 。
Web 服务:基于 FastAPI 的 RESTful API、实时数据推送 。
前端界面:响应式设计、实时数据更新、用户友好界面 。
技术栈
大数据平台:Hadoop HDFS + YARN + Spark + Hive + HBase + Kafka 。
后端开发:FastAPI + Pydantic + Uvicorn + SQLite/MariaDB 。
AI 算法:PyTorch + Scikit-learn + 机器学习预测模型 。
前端技术:HTML5 + CSS3 + JavaScript + ECharts 。
开发环境:Python 3.12 + Anaconda + Docker + Ubuntu 。
项目结构
Plaintext
ecommerce-user-behavior/
├── src/ # 核心源码 [cite: 25]
│ ├── data_models.py # 数据模型定义 [cite: 25]
│ ├── data_generator.py # 实时数据生成器 [cite: 25]
│ ├── data_processor.py # 数据处理器 [cite: 25]
│ └── ml_analyzer.py # 机器学习分析器 [cite: 25]
├── web/ # Web服务 [cite: 25]
│ ├── app.py # FastAPI主应用 [cite: 25]
│ └── static/ # 静态文件 [cite: 25]
│ └── index.html # 前端页面 [cite: 25]
├── config/ # 配置文件 [cite: 28, 29]
│ ├── settings.py # 项目配置 [cite: 30, 31]
│ └── config.py # 系统配置 [cite: 32, 33]
├── data/ # 数据存储 [cite: 34, 35]
│ ├── raw/ # 原始数据 [cite: 36, 37]
│ └── processed/ # 处理后数据 [cite: 38, 39]
├── models/ # 模型文件 [cite: 40, 41]
├── scripts/ # 脚本文件 [cite: 42, 43]
│ ├── start_project.sh # 项目启动脚本 [cite: 44]
│ └── stop_project.sh # 项目停止脚本 [cite: 45]
├── logs/ # 日志文件 [cite: 46, 49]
├── requirements.txt # Python依赖 [cite: 47, 50]
└── README.md # 项目说明 [cite: 48, 51]
实操比赛题目
模块一:大数据平台搭建 (20%)
任务一:基于国产操作系统搭建大数据平台
题目 1.1 在国产操作系统环境中,需要配置 Hadoop 环境变量。请完成以下操作:
Bash
# 1. 设置 JAVA_HOME 环境变量 [cite: 57]
export JAVA_HOME=____ [cite: 58]
# 2. 设置 HADOOP_HOME 环境变量 [cite: 60]
export HADOOP_HOME=____ [cite: 61]
# 3. 设置 HDFS 配置目录 [cite: 63]
export HADOOP_CONF_DIR=____ [cite: 64]
# 4. 将 Hadoop 命令添加到 PATH [cite: 66]
export PATH=$PATH:____/bin:____/sbin [cite: 69]
答案:____ ____ ____ ____ ____ (每空 1 分,共 5 分)
题目 1.2 在 Hadoop 配置文件中,需要设置正确的 HDFS 配置。请在 hdfs-site.xml 中补充:
XML
<property> [cite: 73]
<name>dfs.replication</name> [cite: 76]
<value>____</value> [cite: 77]
</property> [cite: 79]
<property> [cite: 80]
<name>dfs.namenode.name.dir</name> [cite: 81]
<value>____</value> [cite: 84]
</property> [cite: 90]
<property> [cite: 92]
<name>dfs.datanode.data.dir</name> [cite: 87]
<value>____</value> [cite: 88]
</property> [cite: 95]
<property> [cite: 96]
<name>dfs.namenode.http-address</name> [cite: 101]
<value>____</value> [cite: 102]
</property> [cite: 100]
答案:____ ____ ____ ____ (每空 1 分,共 4 分)
任务二:搭建离线处理平台
题目 2.1 在 Spark 配置中,需要设置正确的 Master URL。请在 spark-defaults.conf 中补充:
Properties
# Spark 配置 [cite: 107]
spark.master=____ [cite: 108]
spark.app.name=____ [cite: 109]
spark.driver.memory=____ [cite: 110]
spark.executor.memory=____ [cite: 111]
答案:____ ____ ____ ____ (每空 1 分,共 4 分)
题目 2.2 在 Hive 配置中,需要设置正确的元数据存储。请在 hive-site.xml 中补充:
XML
<property> [cite: 115]
<name>javax.jdo.option.ConnectionURL</name> [cite: 118]
<value>jdbc:derby:;databaseName=____;create=true</value> [cite: 119]
</property> [cite: 120]
<property> [cite: 123]
<name>hive.metastore.warehouse.dir</name> [cite: 126]
<value>____</value> [cite: 127]
</property> [cite: 128]
答案:____ ____ (每空 1 分,共 2 分)
任务三:搭建实时处理平台
题目 3.1 在 Kafka 配置中,需要设置正确的 Zookeeper 连接。请在 server.properties 中补充:
Properties
# Kafka 配置 [cite: 133]
broker.id=0 [cite: 135]
listeners=____://localhost:9092 [cite: 136]
log.dirs=____ [cite: 138]
zookeeper.connect=____ [cite: 139]
答案:____ ____ ____ (每空 1 分,共 3 分)
题目 3.2 在 Kafka 主题创建中,需要设置正确的分区和副本数。请补充:
Bash
# 创建用户行为主题 [cite: 145]
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic user-behavior --partitions ____ --replication-factor ____ [cite: 146]
答案:____ ____ (每空 1 分,共 2 分)
模块二:数据处理 (25%)
任务四:数据抽取
题目 4.1 在数据生成器中,需要修复 JSON 序列化问题。请在 data_generator.py 中补充:
Python
# Kafka 生产者配置 [cite: 151]
self.producer = KafkaProducer( [cite: 152]
bootstrap_servers=KAFKA_CONFIG['bootstrap_servers'], [cite: 155]
value_serializer=lambda v: json.dumps(v, ensure_ascii=False, default=____).encode('utf-8') [cite: 156]
)
答案:____ (1 分)
题目 4.2 在用户行为数据生成中,需要添加正确的行为类型权重。请在 data_generator.py 中补充:
Python
def generate_user_behavior(self): [cite: 161]
user = random.choice(self.users) [cite: 165]
product = random.choice(self.products) [cite: 166]
session = self._get_or_create_session(user.user_id) [cite: 167]
# 行为类型权重 [cite: 169]
behavior_weights = { [cite: 171]
BehaviorType.VIEW: ____, [cite: 173]
BehaviorType.CLICK: ____, [cite: 175]
BehaviorType.ADD_TO_CART: ____, [cite: 177]
BehaviorType.REMOVE_FROM_CART: ____, [cite: 179]
BehaviorType.SEARCH: ____, [cite: 181]
BehaviorType.FAVORITE: 0.05, [cite: 183]
BehaviorType.SHARE: ____, [cite: 185]
BehaviorType.REVIEW: 0.05 [cite: 187]
}
答案:____ ____ ____ ____ ____ ____ (每空 1 分,共 6 分)
题目 4.3 在订单数据生成中,需要添加正确的订单状态选择。请补充:
Python
def generate_order(self): [cite: 197]
user = random.choice(self.users) [cite: 199]
if random.random() > DATA_GENERATION_CONFIG['order_probability']: [cite: 200]
return None [cite: 202]
# 选择 1-5 个商品 [cite: 205]
num_items = random.randint(1, 5) [cite: 211]
selected_products = random.sample(self.products, num_items) [cite: 212]
order_id = str(uuid.uuid4()) [cite: 213]
order_time = datetime.now() [cite: 214]
# 计算订单金额 [cite: 217]
total_amount = 0 [cite: 219]
order_items = [] [cite: 221]
for product in selected_products: [cite: 224]
quantity = random.randint(1, 3) [cite: 227]
unit_price = product.price [cite: 230]
total_price = unit_price * quantity [cite: 231]
total_amount += total_price [cite: 233, 234]
order_item = OrderItem( [cite: 251]
item_id=str(uuid.uuid4()), [cite: 252]
order_id=order_id, [cite: 253]
product_id=product.product_id, [cite: 254]
quantity=quantity, [cite: 255]
unit_price=unit_price, [cite: 256]
total_price=total_price [cite: 257]
)
order_items.append(order_item) [cite: 258]
# 计算优惠和最终金额 [cite: 259]
discount_amount = total_amount * random.uniform(0, 0.3) [cite: 260]
shipping_fee = random.uniform(0, 20) if total_amount < ____ else 0 [cite: 261]
final_amount = total_amount - discount_amount + shipping_fee [cite: 263]
order = Order( [cite: 266]
user_id=user.user_id, [cite: 269]
order_id=order_id, [cite: 277]
order_time=order_time, [cite: 278]
total_amount=round(total_amount, 2), [cite: 279]
discount_amount=round(discount_amount, 2), [cite: 280]
final_amount=round(final_amount, 2), [cite: 281]
status=random.choice(list(____)), [cite: 282]
payment_method=random.choice(self.payment_methods), [cite: 283]
shipping_address=f"{random.choice(self.cities)}{random.randint(1, 10)}区{random.randint(1, 100)}号", [cite: 284, 285]
shipping_fee=round(shipping_fee, 2), [cite: 291]
coupon_code=random.choice([None, f"COUPON_{random.randint(1000, 9999)}"]), [cite: 292]
notes=random.choice([None, "请尽快发货", "包装精美", "货到付款"]) [cite: 293]
)
return order, order_items [cite: 298]
答案:____ ____ (每空 1 分,共 2 分)
题目 4.4 在数据生成器初始化中,需要添加正确的设备类型列表。请补充:
Python
def __init__(self): [cite: 302]
____
self.producer = KafkaProducer( [cite: 304]
bootstrap_servers=KAFKA_CONFIG['bootstrap_servers'], [cite: 306]
value_serializer=lambda v: json.dumps(v, ensure_ascii=False, default=str).encode('utf-8') [cite: 308, 310]
)
____
# 初始化基础数据 [cite: 313]
self.cities = ['北京', '上海', '广州', '深圳', '杭州', '南京', '成都', '武汉', '西安', '重庆'] [cite: 315, 316]
self.device_types = ____ [cite: 318]
self.browsers = ['Chrome', 'Firefox', 'Safari', 'Edge', 'Opera'] [cite: 320]
self.payment_methods = ['alipay', 'wechat', 'credit_card', 'debit_card', 'bank_transfer'] [cite: 322, 324]
self._initialize_data() [cite: 325]
答案:____ ____ ____ (每空 1 分,共 3 分)
任务五:数据清洗
题目 5.1 在数据处理器中,需要添加正确的 Kafka 消费者配置。请在 data_processor.py 中补充:
Python
def __init__(self): [cite: 331]
self.consumer_behavior = KafkaConsumer( [cite: 332, 333]
KAFKA_CONFIG['user_behavior_topic'], [cite: 334]
bootstrap_servers=KAFKA_CONFIG['bootstrap_servers'], [cite: 335]
auto_offset_reset=____, [cite: 336]
enable_auto_commit=____, [cite: 337]
group_id=____, [cite: 338]
value_deserializer=lambda x: json.loads(x.decode('utf-8')) [cite: 339]
)
答案:____ ____ ____ (每空 1 分,共 3 分)
题目 5.2 在数据清洗中,需要添加正确的异常值处理。请补充:
Python
def process_behavior_message(self, message): [cite: 343]
try: [cite: 344]
behavior = UserBehavior(**message.value) [cite: 345, 346]
# 数据清洗:去除异常值 [cite: 352]
if behavior.duration and (behavior.duration < 0 or behavior.duration > ____): [cite: 358]
return # 跳过异常时长数据 [cite: 358]
if behavior.properties and behavior.properties.get('time_on_page', 0) < 0: [cite: 370]
behavior.properties['time_on_page'] = ____ [cite: 370]
self.behavior_data.append(behavior.model_dump()) [cite: 353]
# 保存到 CSV [cite: 364]
df = pd.DataFrame([behavior.model_dump(mode='json')]) [cite: 371]
df.to_csv(self.processed_data_path, mode=____, header=False, index=False) [cite: 372]
except Exception as e: [cite: 373]
print(f"Error processing behavior message: {e}") [cite: 374]
答案:____ ____ ____ (每空 1 分,共 3 分)
题目 5.3 在数据分析中,需要添加正确的聚合统计。请补充:
Python
def _perform_analysis(self): [cite: 378]
try: [cite: 406]
df = pd.read_csv(self.processed_data_path) [cite: 407]
if df.empty: [cite: 408]
return [cite: 409]
# 时间特征提取 [cite: 410]
df['timestamp'] = pd.____(df['timestamp']) [cite: 411]
df['hour'] = df['timestamp'].dt.____ [cite: 412]
df['day_of_week'] = df['timestamp'].dt.____ [cite: 413]
# 小时行为统计 [cite: 414]
hourly_behavior = df.groupby('____')['behavior_type'].____().to_dict() [cite: 415]
# 设备类型统计 [cite: 416]
device_distribution = df.groupby('device_type')['behavior_type'].agg( [cite: 417]
lambda x: x.____[0] if not x.mode().empty else 'unknown' [cite: 418]
).to_dict() [cite: 418]
# 用户活跃度分析 [cite: 419]
user_activity = df.groupby('user_id')['behavior_type'].____().to_dict() [cite: 420, 427]
analysis_results = { [cite: 421]
"timestamp": datetime.now().isoformat(), [cite: 422]
"hourly_behavior": hourly_behavior, [cite: 423]
"device_distribution": device_distribution, [cite: 424]
"user_activity": user_activity [cite: 425]
}
with open(self.analysis_data_path, 'w') as f: [cite: 426]
json.dump(analysis_results, f, ensure_ascii=False, indent=4) [cite: 428]
except Exception as e: [cite: 429]
print(f"Error performing analysis: {e}") [cite: 430]
答案:____ ____ ____ ____ ____ ____ ____ (每空 1 分,共 7 分)
模块三:数据挖掘 (15%)
任务六:特征工程
题目 6.1 在机器学习分析器中,需要添加正确的特征工程。请在 ml_analyzer.py 中补充:
Python
def _get_user_data(self): [cite: 438]
try: [cite: 440]
df_behaviors = pd.read_csv(os.path.join(DATA_DIR, 'user_behaviors.csv')) [cite: 442, 443]
df_orders = pd.read_csv(os.path.join(DATA_DIR, 'order_events.csv')) [cite: 445, 470]
df_profiles = pd.read_csv(os.path.join(DATA_DIR, 'user_profiles.csv')) [cite: 447, 471]
# 转换时间戳 [cite: 469]
df_behaviors['timestamp'] = pd.to_datetime(df_behaviors['timestamp']) [cite: 472]
df_orders['order_date'] = pd.to_datetime(df_orders['order_date']) [cite: 472]
df_profiles['last_active'] = pd.____(df_profiles['last_active']) [cite: 472]
# 聚合行为数据 [cite: 473]
user_activity = df_behaviors.groupby('user_id').agg( [cite: 474]
total_views=('event_type', lambda x: (x == 'view').sum()), [cite: 475]
total_clicks=('event_type', lambda x: (x == 'click').sum()), [cite: 475]
total_add_to_cart=('event_type', lambda x: (x == 'add_to_cart').sum()), [cite: 476]
total_purchases=('event_type', lambda x: (x == 'purchase').sum()), [cite: 477]
last_behavior_time=('timestamp', 'max') [cite: 478]
).reset_index() [cite: 479]
# 聚合订单数据 [cite: 480]
user_orders = df_orders.groupby('user_id').agg( [cite: 481]
total_order_count=('order_id', 'count'), [cite: 482]
total_spent=('total_amount', 'sum'), [cite: 483]
last_purchase_date=('order_date', 'max') [cite: 484]
).reset_index() [cite: 485]
# 合并数据 [cite: 488]
user_data = pd.merge(df_profiles, user_activity, on='user_id', how='left') [cite: 490, 496]
user_data = pd.merge(user_data, user_orders, on='user_id', how='left') [cite: 492, 496]
user_data = user_data.fillna(0) # 填充 NaN [cite: 495, 497]
# 特征工程 [cite: 500]
user_data['days_since_last_active'] = (datetime.now() - user_data['last_active']).dt.days [cite: 502, 503]
user_data['days_since_last_purchase'] = (datetime.now() - user_data['last_purchase_date']).dt.____ [cite: 505, 506]
user_data['days_since_last_purchase'] = user_data['days_since_last_purchase'].fillna(user_data['days_since_last_active']) [cite: 508, 509]
# 定义流失标签 [cite: 517]
user_data['churn'] = (user_data['days_since_last_active'] > ____).astype(int) [cite: 513]
return user_data [cite: 514]
except FileNotFoundError: [cite: 515]
print("Required data files not found for ML analysis.") [cite: 527]
return pd.DataFrame() [cite: 528]
except Exception as e: [cite: 529]
print(f"Error loading or processing user data for ML: {e}") [cite: 530]
return pd.DataFrame() [cite: 531]
答案:____ ____ ____ (每空 1 分,共 3 分)
题目 6.2 在用户分群分析中,需要添加正确的聚类算法。请补充:
Python
def analyze_user_segmentation(self, n_clusters=4): [cite: 535]
user_data = self._get_user_data() [cite: 537, 538]
if user_data.empty: [cite: 540]
return () [cite: 542]
features = user_data[['total_views', 'total_clicks', 'total_purchases', 'total_spent', 'days_since_last_active']] [cite: 545, 546, 547]
if self.scaler is None: [cite: 550]
self.scaler = StandardScaler() [cite: 552]
features_scaled = self.scaler.____(features) [cite: 554, 561]
else: [cite: 556]
features_scaled = self.scaler.transform(features) [cite: 558, 562]
if self.user_segmentation_model is None: [cite: 563]
self.user_segmentation_model = KMeans(n_clusters=n_clusters, random_state=____, n_init=10) [cite: 565, 566, 567]
self.user_segmentation_model.____(features_scaled) [cite: 569, 570]
else: [cite: 568]
____
self._save_models() [cite: 572]
user_data['segment'] = self.user_segmentation_model.____(features_scaled) [cite: 577]
segment_analysis = user_data.groupby('segment').agg( [cite: 578]
count=('user_id', 'count'), [cite: 580]
avg_views=('total_views', 'mean'), [cite: 582, 583]
avg_clicks=('total_clicks', 'mean'), [cite: 585]
avg_purchases=('total_purchases', 'mean'), [cite: 587]
avg_spent=('total_spent', 'mean'), [cite: 589]
avg_days_inactive=('days_since_last_active', 'mean') [cite: 591]
).to_dict('index') [cite: 593]
return user_data[['user_id', 'segment']].to_dict('records'), segment_analysis [cite: 597]
答案:____ ____ ____ ____ (每空 1 分,共 4 分)
任务七:模型训练
题目 7.1 在流失预测模型训练中,需要添加正确的数据分割。请补充:
Python
def train_churn_prediction_model(self): [cite: 603]
user_data = self._get_user_data() [cite: 604]
if user_data.empty or 'churn' not in user_data.columns or len(user_data['churn'].unique()) < 2: [cite: 605, 608]
print("Not enough data or churn labels for training churn prediction model.") [cite: 614]
return False [cite: 614]
features = user_data[['total_views', 'total_clicks', 'total_purchases', 'total_spent', 'days_since_last_active', 'days_since_last_purchase']] [cite: 615, 616]
labels = user_data['churn'] [cite: 618]
X_train, X_test, y_train, y_test = train_test_split( [cite: 621]
features, labels, test_size=____, random_state=____ [cite: 623, 624]
) [cite: 626]
if self.scaler is None: [cite: 629]
self.scaler = StandardScaler() [cite: 650]
X_train_scaled = self.scaler.____(X_train) [cite: 632, 659]
X_test_scaled = self.scaler.transform(X_test) [cite: 651]
else: [cite: 635]
X_train_scaled = self.scaler.transform(X_train) [cite: 637]
X_test_scaled = self.scaler.transform(X_test) [cite: 652]
self.churn_prediction_model = RandomForestClassifier( [cite: 653, 660]
n_estimators=____, [cite: 642]
random_state=42 [cite: 654]
)
self.churn_prediction_model.____(X_train_scaled, y_train) [cite: 655]
y_pred = self.churn_prediction_model.____(X_test_scaled) [cite: 656]
accuracy = accuracy_score(y_test, y_pred) [cite: 657]
print(f"Churn prediction model trained with accuracy: {accuracy}") [cite: 658]
self._save_models() [cite: 662]
return True [cite: 664]
答案:____ ____ ____ ____ ____ ____ (每空 1 分,共 6 分)
题目 7.2 在商品推荐算法中,需要添加正确的推荐逻辑。请补充:
Python
def recommend_products(self, user_id: str, num_recommendations: int = 5): [cite: 668]
user_data = self._get_user_data() [cite: 669]
user_profile_row = user_data[user_data['user_id'] == user_id] [cite: 671]
if user_profile_row.empty: [cite: 675]
return {"user_id": user_id, "recommendations": [], "message": "User not found"} [cite: 677]
try: [cite: 680]
df_products = pd.read_csv(os.path.join(DATA_DIR, 'product_events.csv')) [cite: 684, 685]
except FileNotFoundError: [cite: 687]
print("Product data file not found for recommendations.") [cite: 690]
return {"user_id": user_id, "recommendations": [], "message": "Product data not available"} [cite: 691, 692]
favorite_categories = eval(user_profile_row['favorite_categories'].iloc[0]) [cite: 693]
recommended_products = [] [cite: 694]
for category in favorite_categories: [cite: 695]
category_products = df_products[df_products['category'] == category].nlargest( [cite: 696, 698]
num_recommendations, [cite: 699]
'____'
)
recommended_products.extend(category_products['product_id'].tolist()) [cite: 700]
# 如果推荐不足,用高评分商品补充 [cite: 727]
if len(recommended_products) < num_recommendations: [cite: 728, 729]
top_products = df_products.nlargest(num_recommendations, '____')['product_id'].tolist() [cite: 730, 718]
for prod_id in top_products: [cite: 720]
if prod_id not in recommended_products: [cite: 731]
recommended_products.append(prod_id) [cite: 732]
if len(recommended_products) >= num_recommendations: [cite: 733]
break [cite: 734]
return {"user_id": user_id, "recommendations": recommended_products[:num_recommendations]} [cite: 735]
答案:____ ____ (每空 1 分,共 2 分)
模块四:数据采集与实时计算 (20%)
任务八:数据采集
题目 8.1 在 Kafka 主题管理中,需要添加正确的主题创建逻辑。请在 start_project.sh 中补充:
Bash
# 检查 Kafka 主题是否存在,如果不存在则创建 [cite: 741]
KAFKA_TOPICS=("user-behavior" "order-events" "product-events" "user-profile") [cite: 742]
for TOPIC in "${KAFKA_TOPICS[@]}"; do [cite: 744]
EXISTS=$(/home/zkpk/bigdata/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list | grep -w "$TOPIC") [cite: 746, 747]
if [ -z "$EXISTS" ]; then [cite: 749]
echo "创建 Kafka 主题: $TOPIC" [cite: 751]
/home/zkpk/bigdata/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic "$TOPIC" --partitions ____ --replication-factor ____ [cite: 753, 754]
else [cite: 756]
echo "Kafka 主题 '$TOPIC' 已存在。" [cite: 758]
fi
done [cite: 763]
答案:____ ____ (每空 1 分,共 2 分)
题目 8.2 在数据生成器启动中,需要添加正确的环境变量设置。请补充:
Bash
# 启动数据生成器 [cite: 767]
echo "启动用户行为数据生成器..." [cite: 768]
____ "$PROJECT_HOME/src/data_generator.py" > "$LOGS_DIR/data_generator.log" 2>&1 & [cite: 770, 771]
echo $! > "$PID_DIR/____" [cite: 773]
答案:____ ____ (每空 1 分,共 2 分)
任务九:数据处理
题目 9.1 在 Web 应用中,需要添加正确的 Kafka 消费者线程。请在 app.py 中补充:
Python
def consume_kafka_messages(): [cite: 781]
global realtime_metrics
consumer_behavior = KafkaConsumer( [cite: 782, 784]
KAFKA_CONFIG['____'], [cite: 783]
bootstrap_servers=KAFKA_CONFIG['bootstrap_servers'], [cite: 785]
auto_offset_reset='____', [cite: 787]
enable_auto_commit=____, [cite: 791]
group_id='fastapi-behavior-consumer', [cite: 792]
value_deserializer=lambda x: json.loads(x.decode('utf-8')) [cite: 793]
)
print("开始消费 Kafka 数据...") [cite: 797]
for message in consumer_behavior.poll(timeout_ms=100).values(): [cite: 800]
try: [cite: 803]
for msg in message: [cite: 820]
behavior = UserBehavior(**msg.value) [cite: 821]
realtime_behaviors.append(behavior.model_dump()) [cite: 822]
# 更新实时指标 [cite: 823]
realtime_metrics["behavior_distribution"][behavior.event_type] += 1 [cite: 824]
realtime_metrics["device_distribution"][behavior.device_type] += 1 [cite: 824]
realtime_metrics["hourly_activity"][behavior.timestamp.hour] += 1 [cite: 824]
if behavior.event_type == 'view': [cite: 825]
realtime_metrics["total_views"] += 1 [cite: 826]
elif behavior.event_type == 'click': [cite: 827]
realtime_metrics["total_clicks"] += 1 [cite: 828]
elif behavior.event_type == 'add_to_cart': [cite: 828]
realtime_metrics["total_add_to_cart"] += 1 [cite: 828]
elif behavior.event_type == 'purchase': [cite: 828]
realtime_metrics["total_purchases"] += 1 [cite: 828]
except Exception as e: [cite: 832]
print(f"Error processing behavior message in FastAPI: {e}, Message: {msg.value}") [cite: 833, 834]
答案:____ ____ ____ (每空 1 分,共 3 分)
题目 9.2 在实时数据 API 中,需要添加正确的数据返回格式。请补充:
Python
@app.get("/api/realtime_metrics") [cite: 838]
async def get_realtime_metrics(): [cite: 840]
# 确保小时活动覆盖所有 24 小时 [cite: 843]
full_hourly_activity = ((h, realtime_metrics["hourly_activity"].get(h, 0)) for h in range(____)) [cite: 844, 845]
return { [cite: 848]
"total_views": realtime_metrics["____"], [cite: 850]
"total_clicks": realtime_metrics["total_clicks"], [cite: 852]
"total_add_to_cart": realtime_metrics["total_add_to_cart"], [cite: 854]
"total_purchases": realtime_metrics["____"], [cite: 856]
"total_orders": realtime_metrics["total_orders"], [cite: 858]
"total_revenue": round(realtime_metrics["total_revenue"], 2), [cite: 860]
"behavior_distribution": dict(realtime_metrics["behavior_distribution"]), [cite: 862]
"category_distribution": dict(realtime_metrics["category_distribution"]), [cite: 864]
"device_distribution": dict(realtime_metrics["device_distribution"]), [cite: 867]
"hourly_activity": dict(sorted(____)) [cite: 868]
}
答案:____ ____ ____ ____ (4 分)
题目 9.3 在机器学习分析 API 中,需要添加正确的用户分群接口。请补充:
Python
@app.get("/api/user_segmentation") [cite: 875]
async def get_user_segmentation(): [cite: 877]
try: [cite: 879]
user_segments, segment_summary = ml_analyzer.____() [cite: 881, 882]
return {"user_segments": user_segments, "segment_summary": segment_summary} [cite: 887]
except Exception as e: [cite: 885]
raise HTTPException(status_code=500, detail=f"User segmentation failed: {e}") [cite: 888]
@app.get("/api/churn_prediction/{user_id}") [cite: 891]
async def get_churn_prediction(user_id: str): [cite: 893]
try: [cite: 895]
churn_risk = ml_analyzer.____(user_id) [cite: 897, 898]
return churn_risk [cite: 900]
except Exception as e: [cite: 902]
raise HTTPException(status_code=500, detail=f"Churn prediction failed for user {user_id}: {e}") [cite: 904, 905]
@app.get("/api/product_recommendations/{user_id}") [cite: 909]
async def get_product_recommendations(user_id: str): [cite: 914]
try: [cite: 916]
recommendations = ml_analyzer.____(user_id) [cite: 910]
return recommendations [cite: 911]
except Exception as e: [cite: 912]
raise HTTPException(status_code=500, detail=f"Product recommendations failed for user {user_id}: {e}") [cite: 924]
答案:____ ____ ____ (每空 1 分,共 3 分)
题目 9.4 在数据流处理中,需要添加正确的实时统计更新。请补充:
Python
def update_traffic_stats(data): [cite: 926]
global realtime_metrics [cite: 927]
____
behavior_type = data.get('behavior_type', "unknown") [cite: 943]
realtime_metrics["behavior_distribution"][behavior_type] += 1 [cite: 944]
# 更新设备分布统计 [cite: 945]
device_type = data.get('device_type', "unknown") [cite: 946]
realtime_metrics["device_distribution"][device_type] += 1 [cite: 947]
# 更新小时活跃度统计 [cite: 940]
timestamp = data.get('timestamp', datetime.now()) [cite: 948]
if isinstance(timestamp, str): [cite: 950]
timestamp = datetime.____(timestamp) [cite: 952, 953]
hour = timestamp.____ [cite: 955, 956]
realtime_metrics["hourly_activity"][hour] += 1 [cite: 958]
# 更新总计数 [cite: 961]
if behavior_type == 'view': [cite: 963]
realtime_metrics["total_views"] += 1 [cite: 965]
elif behavior_type == 'click': [cite: 967]
realtime_metrics["total_clicks"] += 1 [cite: 969]
elif behavior_type == 'add_to_cart': [cite: 971]
realtime_metrics["total_add_to_cart"] += 1 [cite: 973]
elif behavior_type == 'purchase': [cite: 978]
realtime_metrics["total_purchases"] += 1 [cite: 979]
realtime_metrics["total_orders"] += 1 [cite: 980]
realtime_metrics["total_revenue"] += data.get('total_amount', 0) [cite: 981]
答案:____ ____ ____ (3 分)
题目 9.5 在数据持久化中,需要添加正确的数据保存逻辑。请补充:
Python
def save_data_periodically(self): [cite: 985]
if self.behavior_data: [cite: 987]
df_behavior = pd.DataFrame(self.behavior_data) [cite: 991]
df_behavior['timestamp'] = pd.to_datetime(df_behavior['timestamp']) [cite: 992]
df_behavior.to_csv( [cite: 992]
os.path.join(DATA_DIR, 'user_behaviors.csv'), [cite: 992]
mode=____, [cite: 994]
header=not os.path.exists(os.path.join(DATA_DIR, 'user_behaviors.csv')), [cite: 996, 997]
index=False [cite: 999]
)
self.behavior_data = [] # 清空数据 [cite: 1001]
if self.order_data: [cite: 1004]
df_orders = pd.DataFrame(self.order_data) [cite: 1006]
df_orders['order_date'] = pd.to_datetime(df_orders['order_date']) [cite: 1009]
df_orders.to_csv( [cite: 1010]
os.path.join(DATA_DIR, 'order_events.csv'), [cite: 1010]
mode=____, [cite: 1012]
header=not os.path.exists(os.path.join(DATA_DIR, 'order_events.csv')), [cite: 1014, 1015]
index=False [cite: 1017]
)
self.order_data = [] # 清空数据 [cite: 1024]
if self.user_profile_data: [cite: 1025]
df_user_profiles = pd.DataFrame(list(self.user_profile_data.values())) [cite: 1026]
df_user_profiles['last_active'] = pd.to_datetime(df_user_profiles['last_active']) [cite: 1027, 1028]
df_user_profiles.to_csv( [cite: 1029]
os.path.join(DATA_DIR, 'user_profiles.csv'), [cite: 1029]
mode=____, [cite: 1030]
index=False [cite: 1031]
) # 覆盖保存最新状态 [cite: 1031]
答案:____ ____ ____ (每空 1 分,共 3 分)
模块五:数据可视化 (10%)
任务十:制作图表
题目 10.1 在前端 JavaScript 中,需要添加正确的图表初始化。请在 index.html 中补充:
JavaScript
// 初始化图表 [cite: 1037]
function initCharts() { [cite: 1038]
charts.behaviorDistribution = echarts.init(document.getElementById('behaviorDistributionChart')); [cite: 1040, 1041]
charts.deviceDistribution = echarts.____(document.getElementById('deviceDistributionChart')); [cite: 1045, 1046]
charts.hourlyActivity = echarts.init(document.getElementById('hourlyActivityChart')); [cite: 1047, 1048]
charts.userSegmentation = echarts.init(document.getElementById('userSegmentationChart')); [cite: 1049, 1050]
// 响应式调整 [cite: 1051]
window.addEventListener('resize', function() { [cite: 1052]
Object.values(charts).forEach(chart => chart.____()); [cite: 1057]
}); [cite: 1059]
}
答案:____ ____ (每空 1 分,共 2 分)
题目 10.2 在用户 behavior 分布图表中,需要添加正确的饼图配置。请补充:
JavaScript
// 更新用户行为分布图表 [cite: 1066]
function updateBehaviorChart(data) { [cite: 1067]
const behaviorData = Object.entries(data.behavior_distribution).map(([name, value]) => ({ [cite: 1071, 1072]
name: name === 'view' ? '预览' : [cite: 1073]
name === 'click' ? '点击' : [cite: 1074]
name === 'add_to_cart' ? '加购物车' : [cite: 1075]
name === 'purchase' ? '购买' : [cite: 1076, 1077]
name === 'favorite' ? '收藏' : '分享', [cite: 1078]
value: ____
})); [cite: 1089]
charts.behaviorDistribution.setOption({ [cite: 1079]
title: { text: '用户行为分布', left: 'center', show: false }, [cite: 1080]
tooltip: { trigger: 'item' }, [cite: 1081]
legend: { orient: 'vertical', left: 'left' }, [cite: 1082]
series: [{ [cite: 1083]
name: '行为类型', [cite: 1101]
type: '____', [cite: 1102]
radius: '____', [cite: 1103]
data: behaviorData, [cite: 1104]
emphasis: { [cite: 1105]
itemStyle: { [cite: 1105]
shadowBlur: 10, [cite: 1105]
shadowOffsetX: 0, [cite: 1105]
shadowColor: 'rgba(0, 0, 0, 0.5)' [cite: 1106]
} [cite: 1106]
} [cite: 1106]
}]
}); [cite: 1109]
}
答案:____ ____ ____ (每空 1 分,共 3 分)
题目 10.3 在 24 小时活跃度趋势图表中,需要添加正确的时间序列配置。请补充:
JavaScript
// 更新 24 小时活跃度趋势图表 [cite: 1114]
function updateHourlyActivityChart(data) { [cite: 1115]
const hours = Object.keys(data.hourly_activity).map(hour => `${hour}:00`); [cite: 1126]
const activityData = Object.values(data.hourly_activity); [cite: 1127]
charts.hourlyActivity.setOption({ [cite: 1128]
title: { text: '24小时活跃度趋势', left: 'center', show: false }, [cite: 1129]
tooltip: { trigger: 'axis' }, [cite: 1130]
xAxis: { type: 'category', data: hours }, [cite: 1131]
yAxis: { type: 'value', name: '活跃度' }, [cite: 1132]
series: [{ [cite: 1133]
name: '活跃度', [cite: 1134]
type: '____', [cite: 1148]
data: activityData, [cite: 1149]
smooth: ____, [cite: 1150]
itemStyle: { color: '#91cc75' } [cite: 1151]
}]
}); [cite: 1144]
}
答案:____ ____ (每空 1 分,共 2 分)
任务十一:分析数据
题目 11.1 在实时监控功能中,需要添加正确的定时更新逻辑。请补充:
JavaScript
// 开始实时监控 [cite: 1154]
function startRealTime() { [cite: 1155]
if (realTimeInterval) clearInterval(realTimeInterval); [cite: 1157]
realTimeInterval = setInterval(async () => { [cite: 1164]
try { [cite: 1161]
const response = await fetch('____'); [cite: 1163, 1165]
const data = await response.json(); [cite: 1167]
if (data.____ === 'success') { [cite: 1171]
updateBehaviorChart(data); [cite: 1173]
updateDeviceChart(data); [cite: 1175]
updateHourlyActivityChart(data); [cite: 1177]
// 更新指标显示 [cite: 1180]
document.getElementById('totalViews').innerText = data.total_views; [cite: 1182, 1183]
document.getElementById('totalClicks').innerText = data.total_clicks; [cite: 1185]
document.getElementById('totalPurchases').innerText = data.total_purchases; [cite: 1187, 1188]
document.getElementById('totalRevenue').innerText = data.total_revenue.toFixed(2); [cite: 1190, 1191]
document.getElementById('status').textContent = '状态:实时监控中 - ' + new Date().toLocaleTimeString(); [cite: 1194, 1195]
}
} catch (error) { [cite: 1199]
console.error('实时监控出错:', error); [cite: 1207]
}
}, ____); // 每 3 秒更新一次 [cite: 1208]
document.getElementById('startBtn').disabled = true; [cite: 1209]
document.getElementById('stopBtn').disabled = false; [cite: 1210]
document.getElementById('status').textContent = '状态:实时监控已启动'; [cite: 1211]
}
答案:____ ____ ____ (每空 1 分,共 3 分)
模块六:综合分析任务 (5%)
任务十二:分析数据
题目 12.1 在系统健康检查中,需要添加正确的服务状态监控。请在 app.py 中补充:
Python
@app.get("/api/health") [cite: 1221]
async def health_check(): [cite: 1223]
kafka_status = "healthy" [cite: 1227]
try: [cite: 1229]
# 尝试列出主题来检查 Kafka 连接 [cite: 1231]
____ [cite: 1233, 1234]
except Exception: [cite: 1236]
kafka_status = "unhealthy" [cite: 1238]
ml_model_status = "healthy" if ml_analyzer.user_segmentation_model and ml_analyzer.churn_prediction_model else "uninitialized" [cite: 1241, 1242]
return { [cite: 1245]
"status": "ok", [cite: 1247]
"kafka_status": kafka_status, [cite: 1249]
"ml_models": ml_model_status, [cite: 1251]
"timestamp": datetime.now() [cite: 1253]
} [cite: 1255]
答案:____ (1 分)
题目 12.2 在数据分析 API 中,需要添加正确的统计计算。请补充:
Python
@app.get("/api/behavior/analysis") [cite: 1259]
async def get_behavior_analysis(): [cite: 1269]
try: [cite: 1269]
return { [cite: 1270]
"status": "____", [cite: 1271]
"data": { [cite: 1272]
"behavior_distribution": realtime_metrics["behavior_distribution"], [cite: 1273]
"device_distribution": realtime_metrics["behavior_distribution"], [cite: 1273]
"hourly_activity": [ [cite: 1273]
{
"hour": f"{i}:00", [cite: 1274]
"activity": realtime_metrics["hourly_activity"].get(i, 0) [cite: 1275]
} for i in range(____) [cite: 1276]
]
}
} [cite: 1284]
except Exception as e: [cite: 1285]
raise HTTPException(status_code=500, detail=str(e)) [cite: 1286]
答案:____ ____ (每空 1 分,共 2 分)
任务十三:提出方案
题目 13.1 在项目启动脚本中,需要添加正确的服务启动顺序。请在 start_project.sh 中补充:
Bash
#!/bin/bash [cite: 1292]
PROJECT_HOME="/home/zkpk/projects/ecommerce-user-behavior" [cite: 1294]
PYTHON_ENV="/home/zkpk/Anaconda3/bin/python" [cite: 1296]
KAFKA_HOME="/home/zkpk/bigdata/kafka" [cite: 1297]
echo "启动电商用户行为分析平台..." [cite: 1299]
# 1. 检查大数据 environment 状态 [cite: 1301]
echo "检查大数据环境状态..." [cite: 1302]
/home/zkpk/bigdata/check-bigdata.sh [cite: 1303]
if [ $? -ne 0 ]; then [cite: 1304]
echo "大数据环境检查失败,请确保所有组件已正确启动。" [cite: 1309]
exit ____ [cite: 1310]
fi [cite: 1307]
# 2. 检查并创建 Kafka 主题 [cite: 1311]
echo "检查 Kafka 主题..." [cite: 1312]
export PATH=$PATH:$KAFKA_HOME/bin [cite: 1313]
____
# 3. 启动数据生成器 [cite: 1315]
echo "启动用户行为数据生成器..." [cite: 1316]
nohup $PYTHON_ENV $PROJECT_HOME/src/data_generator.py > $PROJECT_HOME/logs/data_generator.log 2>&1 & [cite: 1317, 1318]
echo $! > $PROJECT_HOME/pids/data_generator.pid [cite: 1319]
sleep 2 [cite: 1321]
# 4. 启动数据处理器 [cite: 1324]
echo "启动数据处理器..." [cite: 1325]
nohup $PYTHON_ENV $PROJECT_HOME/src/data_processor.py > $PROJECT_HOME/logs/data_processor.log 2>&1 & [cite: 1326, 1327]
echo $! > $PROJECT_HOME/pids/data_processor.pid [cite: 1328]
sleep 2 [cite: 1329]
# 5. 启动 Web 服务 [cite: 1331]
echo "启动 Web 服务..." [cite: 1332]
export PYTHONPATH=$PROJECT_HOME:$PYTHONPATH [cite: 1333]
nohup $PYTHON_ENV -m uvicorn web.app:app --host 0.0.0.0 --port 8080 > $PROJECT_HOME/logs/web_service.log 2>&1 & [cite: 1334, 1335]
echo $! > $PROJECT_HOME/pids/web_service.pid [cite: 1336]
sleep 5 [cite: 1337]
echo "平台启动完成!" [cite: 1338]
echo "Web 界面: http://localhost:8080" [cite: 1341]
echo "使用 $PROJECT_HOME/stop_project.sh 停止所有服务" [cite: 1342]
答案:____ ____ (每空 1 分,共 2 分)