Skip to content
BrushUP
返回

2025 睿抗 CAIP 大数据应用开发赛项决赛样题

睿抗CAIP强脑赛道

2025 RAICOM 睿抗机器人开发者大赛-大数据应用开发赛项决赛样题

项目背景与介绍

项目背景

在数字化电商时代,用户行为分析平台作为电商系统的核心组件,承担着用户画像分析、行为预测、个性化推荐、流失预警等重要功能 。本项目基于国产操作系统环境,采用现代化的大数据技术栈,构建了一个功能完整、性能优异的智能电商分析平台 。

项目介绍

本项目是一个基于大数据技术栈开发的电商用户行为分析平台,采用微服务架构设计 。系统具备以下核心功能 :

实时用户行为监控:基于 Kafka 的实时数据流处理、用户行为实时展示 。

数据分析引擎:基于 Spark 的离线数据处理、Hive 数据仓库分析 。

智能分析:基于机器学习的用户分群、流失预测、商品推荐 。

可视化展示:基于 ECharts 的多维度数据可视化、交互式图表 。

Web 服务:基于 FastAPI 的 RESTful API、实时数据推送 。

前端界面:响应式设计、实时数据更新、用户友好界面 。

技术栈

大数据平台:Hadoop HDFS + YARN + Spark + Hive + HBase + Kafka 。

后端开发:FastAPI + Pydantic + Uvicorn + SQLite/MariaDB 。

AI 算法:PyTorch + Scikit-learn + 机器学习预测模型 。

前端技术:HTML5 + CSS3 + JavaScript + ECharts 。

开发环境:Python 3.12 + Anaconda + Docker + Ubuntu 。

项目结构

Plaintext

ecommerce-user-behavior/
├── src/                          # 核心源码 [cite: 25]
│   ├── data_models.py            # 数据模型定义 [cite: 25]
│   ├── data_generator.py         # 实时数据生成器 [cite: 25]
│   ├── data_processor.py         # 数据处理器 [cite: 25]
│   └── ml_analyzer.py            # 机器学习分析器 [cite: 25]
├── web/                          # Web服务 [cite: 25]
│   ├── app.py                    # FastAPI主应用 [cite: 25]
│   └── static/                   # 静态文件 [cite: 25]
│       └── index.html            # 前端页面 [cite: 25]
├── config/                       # 配置文件 [cite: 28, 29]
│   ├── settings.py               # 项目配置 [cite: 30, 31]
│   └── config.py                 # 系统配置 [cite: 32, 33]
├── data/                         # 数据存储 [cite: 34, 35]
│   ├── raw/                      # 原始数据 [cite: 36, 37]
│   └── processed/                # 处理后数据 [cite: 38, 39]
├── models/                       # 模型文件 [cite: 40, 41]
├── scripts/                      # 脚本文件 [cite: 42, 43]
│   ├── start_project.sh          # 项目启动脚本 [cite: 44]
│   └── stop_project.sh           # 项目停止脚本 [cite: 45]
├── logs/                         # 日志文件 [cite: 46, 49]
├── requirements.txt              # Python依赖 [cite: 47, 50]
└── README.md                     # 项目说明 [cite: 48, 51]

实操比赛题目

模块一:大数据平台搭建 (20%)

任务一:基于国产操作系统搭建大数据平台

题目 1.1 在国产操作系统环境中,需要配置 Hadoop 环境变量。请完成以下操作:

Bash

# 1. 设置 JAVA_HOME 环境变量 [cite: 57]
export JAVA_HOME=____ [cite: 58]

# 2. 设置 HADOOP_HOME 环境变量 [cite: 60]
export HADOOP_HOME=____ [cite: 61]

# 3. 设置 HDFS 配置目录 [cite: 63]
export HADOOP_CONF_DIR=____ [cite: 64]

# 4. 将 Hadoop 命令添加到 PATH [cite: 66]
export PATH=$PATH:____/bin:____/sbin [cite: 69]

答案:____ ____ ____ ____ ____ (每空 1 分,共 5 分)

题目 1.2 在 Hadoop 配置文件中,需要设置正确的 HDFS 配置。请在 hdfs-site.xml 中补充:

XML

<property> [cite: 73]
    <name>dfs.replication</name> [cite: 76]
    <value>____</value> [cite: 77]
</property> [cite: 79]

<property> [cite: 80]
    <name>dfs.namenode.name.dir</name> [cite: 81]
    <value>____</value> [cite: 84]
</property> [cite: 90]

<property> [cite: 92]
    <name>dfs.datanode.data.dir</name> [cite: 87]
    <value>____</value> [cite: 88]
</property> [cite: 95]

<property> [cite: 96]
    <name>dfs.namenode.http-address</name> [cite: 101]
    <value>____</value> [cite: 102]
</property> [cite: 100]

答案:____ ____ ____ ____ (每空 1 分,共 4 分)


任务二:搭建离线处理平台

题目 2.1 在 Spark 配置中,需要设置正确的 Master URL。请在 spark-defaults.conf 中补充:

Properties

# Spark 配置 [cite: 107]
spark.master=____ [cite: 108]
spark.app.name=____ [cite: 109]
spark.driver.memory=____ [cite: 110]
spark.executor.memory=____ [cite: 111]

答案:____ ____ ____ ____ (每空 1 分,共 4 分)

题目 2.2 在 Hive 配置中,需要设置正确的元数据存储。请在 hive-site.xml 中补充:

XML

<property> [cite: 115]
    <name>javax.jdo.option.ConnectionURL</name> [cite: 118]
    <value>jdbc:derby:;databaseName=____;create=true</value> [cite: 119]
</property> [cite: 120]

<property> [cite: 123]
    <name>hive.metastore.warehouse.dir</name> [cite: 126]
    <value>____</value> [cite: 127]
</property> [cite: 128]

答案:____ ____ (每空 1 分,共 2 分)


任务三:搭建实时处理平台

题目 3.1 在 Kafka 配置中,需要设置正确的 Zookeeper 连接。请在 server.properties 中补充:

Properties

# Kafka 配置 [cite: 133]
broker.id=0 [cite: 135]
listeners=____://localhost:9092 [cite: 136]
log.dirs=____ [cite: 138]
zookeeper.connect=____ [cite: 139]

答案:____ ____ ____ (每空 1 分,共 3 分)

题目 3.2 在 Kafka 主题创建中,需要设置正确的分区和副本数。请补充:

Bash

# 创建用户行为主题 [cite: 145]
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic user-behavior --partitions ____ --replication-factor ____ [cite: 146]

答案:____ ____ (每空 1 分,共 2 分)


模块二:数据处理 (25%)

任务四:数据抽取

题目 4.1 在数据生成器中,需要修复 JSON 序列化问题。请在 data_generator.py 中补充:

Python

# Kafka 生产者配置 [cite: 151]
self.producer = KafkaProducer( [cite: 152]
    bootstrap_servers=KAFKA_CONFIG['bootstrap_servers'], [cite: 155]
    value_serializer=lambda v: json.dumps(v, ensure_ascii=False, default=____).encode('utf-8') [cite: 156]
)

答案:____ (1 分)

题目 4.2 在用户行为数据生成中,需要添加正确的行为类型权重。请在 data_generator.py 中补充:

Python

def generate_user_behavior(self): [cite: 161]
    user = random.choice(self.users) [cite: 165]
    product = random.choice(self.products) [cite: 166]
    session = self._get_or_create_session(user.user_id) [cite: 167]
    
    # 行为类型权重 [cite: 169]
    behavior_weights = { [cite: 171]
        BehaviorType.VIEW: ____, [cite: 173]
        BehaviorType.CLICK: ____, [cite: 175]
        BehaviorType.ADD_TO_CART: ____, [cite: 177]
        BehaviorType.REMOVE_FROM_CART: ____, [cite: 179]
        BehaviorType.SEARCH: ____, [cite: 181]
        BehaviorType.FAVORITE: 0.05, [cite: 183]
        BehaviorType.SHARE: ____, [cite: 185]
        BehaviorType.REVIEW: 0.05 [cite: 187]
    }

答案:____ ____ ____ ____ ____ ____ (每空 1 分,共 6 分)

题目 4.3 在订单数据生成中,需要添加正确的订单状态选择。请补充:

Python

def generate_order(self): [cite: 197]
    user = random.choice(self.users) [cite: 199]
    if random.random() > DATA_GENERATION_CONFIG['order_probability']: [cite: 200]
        return None [cite: 202]
        
    # 选择 1-5 个商品 [cite: 205]
    num_items = random.randint(1, 5) [cite: 211]
    selected_products = random.sample(self.products, num_items) [cite: 212]
    order_id = str(uuid.uuid4()) [cite: 213]
    order_time = datetime.now() [cite: 214]
    
    # 计算订单金额 [cite: 217]
    total_amount = 0 [cite: 219]
    order_items = [] [cite: 221]
    
    for product in selected_products: [cite: 224]
        quantity = random.randint(1, 3) [cite: 227]
        unit_price = product.price [cite: 230]
        total_price = unit_price * quantity [cite: 231]
        total_amount += total_price [cite: 233, 234]
        
        order_item = OrderItem( [cite: 251]
            item_id=str(uuid.uuid4()), [cite: 252]
            order_id=order_id, [cite: 253]
            product_id=product.product_id, [cite: 254]
            quantity=quantity, [cite: 255]
            unit_price=unit_price, [cite: 256]
            total_price=total_price [cite: 257]
        )
        order_items.append(order_item) [cite: 258]
        
    # 计算优惠和最终金额 [cite: 259]
    discount_amount = total_amount * random.uniform(0, 0.3) [cite: 260]
    shipping_fee = random.uniform(0, 20) if total_amount < ____ else 0 [cite: 261]
    final_amount = total_amount - discount_amount + shipping_fee [cite: 263]
    
    order = Order( [cite: 266]
        user_id=user.user_id, [cite: 269]
        order_id=order_id, [cite: 277]
        order_time=order_time, [cite: 278]
        total_amount=round(total_amount, 2), [cite: 279]
        discount_amount=round(discount_amount, 2), [cite: 280]
        final_amount=round(final_amount, 2), [cite: 281]
        status=random.choice(list(____)), [cite: 282]
        payment_method=random.choice(self.payment_methods), [cite: 283]
        shipping_address=f"{random.choice(self.cities)}{random.randint(1, 10)}区{random.randint(1, 100)}号", [cite: 284, 285]
        shipping_fee=round(shipping_fee, 2), [cite: 291]
        coupon_code=random.choice([None, f"COUPON_{random.randint(1000, 9999)}"]), [cite: 292]
        notes=random.choice([None, "请尽快发货", "包装精美", "货到付款"]) [cite: 293]
    )
    return order, order_items [cite: 298]

答案:____ ____ (每空 1 分,共 2 分)

题目 4.4 在数据生成器初始化中,需要添加正确的设备类型列表。请补充:

Python

def __init__(self): [cite: 302]
    ____
    self.producer = KafkaProducer( [cite: 304]
        bootstrap_servers=KAFKA_CONFIG['bootstrap_servers'], [cite: 306]
        value_serializer=lambda v: json.dumps(v, ensure_ascii=False, default=str).encode('utf-8') [cite: 308, 310]
    )
    ____
    # 初始化基础数据 [cite: 313]
    self.cities = ['北京', '上海', '广州', '深圳', '杭州', '南京', '成都', '武汉', '西安', '重庆'] [cite: 315, 316]
    self.device_types = ____ [cite: 318]
    self.browsers = ['Chrome', 'Firefox', 'Safari', 'Edge', 'Opera'] [cite: 320]
    self.payment_methods = ['alipay', 'wechat', 'credit_card', 'debit_card', 'bank_transfer'] [cite: 322, 324]
    self._initialize_data() [cite: 325]

答案:____ ____ ____ (每空 1 分,共 3 分)


任务五:数据清洗

题目 5.1 在数据处理器中,需要添加正确的 Kafka 消费者配置。请在 data_processor.py 中补充:

Python

def __init__(self): [cite: 331]
    self.consumer_behavior = KafkaConsumer( [cite: 332, 333]
        KAFKA_CONFIG['user_behavior_topic'], [cite: 334]
        bootstrap_servers=KAFKA_CONFIG['bootstrap_servers'], [cite: 335]
        auto_offset_reset=____, [cite: 336]
        enable_auto_commit=____, [cite: 337]
        group_id=____, [cite: 338]
        value_deserializer=lambda x: json.loads(x.decode('utf-8')) [cite: 339]
    )

答案:____ ____ ____ (每空 1 分,共 3 分)

题目 5.2 在数据清洗中,需要添加正确的异常值处理。请补充:

Python

def process_behavior_message(self, message): [cite: 343]
    try: [cite: 344]
        behavior = UserBehavior(**message.value) [cite: 345, 346]
        
        # 数据清洗:去除异常值 [cite: 352]
        if behavior.duration and (behavior.duration < 0 or behavior.duration > ____): [cite: 358]
            return  # 跳过异常时长数据 [cite: 358]
            
        if behavior.properties and behavior.properties.get('time_on_page', 0) < 0: [cite: 370]
            behavior.properties['time_on_page'] = ____ [cite: 370]
            
        self.behavior_data.append(behavior.model_dump()) [cite: 353]
        
        # 保存到 CSV [cite: 364]
        df = pd.DataFrame([behavior.model_dump(mode='json')]) [cite: 371]
        df.to_csv(self.processed_data_path, mode=____, header=False, index=False) [cite: 372]
    except Exception as e: [cite: 373]
        print(f"Error processing behavior message: {e}") [cite: 374]

答案:____ ____ ____ (每空 1 分,共 3 分)

题目 5.3 在数据分析中,需要添加正确的聚合统计。请补充:

Python

def _perform_analysis(self): [cite: 378]
    try: [cite: 406]
        df = pd.read_csv(self.processed_data_path) [cite: 407]
        if df.empty: [cite: 408]
            return [cite: 409]
            
        # 时间特征提取 [cite: 410]
        df['timestamp'] = pd.____(df['timestamp']) [cite: 411]
        df['hour'] = df['timestamp'].dt.____ [cite: 412]
        df['day_of_week'] = df['timestamp'].dt.____ [cite: 413]
        
        # 小时行为统计 [cite: 414]
        hourly_behavior = df.groupby('____')['behavior_type'].____().to_dict() [cite: 415]
        
        # 设备类型统计 [cite: 416]
        device_distribution = df.groupby('device_type')['behavior_type'].agg( [cite: 417]
            lambda x: x.____[0] if not x.mode().empty else 'unknown' [cite: 418]
        ).to_dict() [cite: 418]
        
        # 用户活跃度分析 [cite: 419]
        user_activity = df.groupby('user_id')['behavior_type'].____().to_dict() [cite: 420, 427]
        
        analysis_results = { [cite: 421]
            "timestamp": datetime.now().isoformat(), [cite: 422]
            "hourly_behavior": hourly_behavior, [cite: 423]
            "device_distribution": device_distribution, [cite: 424]
            "user_activity": user_activity [cite: 425]
        }
        
        with open(self.analysis_data_path, 'w') as f: [cite: 426]
            json.dump(analysis_results, f, ensure_ascii=False, indent=4) [cite: 428]
    except Exception as e: [cite: 429]
        print(f"Error performing analysis: {e}") [cite: 430]

答案:____ ____ ____ ____ ____ ____ ____ (每空 1 分,共 7 分)


模块三:数据挖掘 (15%)

任务六:特征工程

题目 6.1 在机器学习分析器中,需要添加正确的特征工程。请在 ml_analyzer.py 中补充:

Python

def _get_user_data(self): [cite: 438]
    try: [cite: 440]
        df_behaviors = pd.read_csv(os.path.join(DATA_DIR, 'user_behaviors.csv')) [cite: 442, 443]
        df_orders = pd.read_csv(os.path.join(DATA_DIR, 'order_events.csv')) [cite: 445, 470]
        df_profiles = pd.read_csv(os.path.join(DATA_DIR, 'user_profiles.csv')) [cite: 447, 471]
        
        # 转换时间戳 [cite: 469]
        df_behaviors['timestamp'] = pd.to_datetime(df_behaviors['timestamp']) [cite: 472]
        df_orders['order_date'] = pd.to_datetime(df_orders['order_date']) [cite: 472]
        df_profiles['last_active'] = pd.____(df_profiles['last_active']) [cite: 472]
        
        # 聚合行为数据 [cite: 473]
        user_activity = df_behaviors.groupby('user_id').agg( [cite: 474]
            total_views=('event_type', lambda x: (x == 'view').sum()), [cite: 475]
            total_clicks=('event_type', lambda x: (x == 'click').sum()), [cite: 475]
            total_add_to_cart=('event_type', lambda x: (x == 'add_to_cart').sum()), [cite: 476]
            total_purchases=('event_type', lambda x: (x == 'purchase').sum()), [cite: 477]
            last_behavior_time=('timestamp', 'max') [cite: 478]
        ).reset_index() [cite: 479]
        
        # 聚合订单数据 [cite: 480]
        user_orders = df_orders.groupby('user_id').agg( [cite: 481]
            total_order_count=('order_id', 'count'), [cite: 482]
            total_spent=('total_amount', 'sum'), [cite: 483]
            last_purchase_date=('order_date', 'max') [cite: 484]
        ).reset_index() [cite: 485]
        
        # 合并数据 [cite: 488]
        user_data = pd.merge(df_profiles, user_activity, on='user_id', how='left') [cite: 490, 496]
        user_data = pd.merge(user_data, user_orders, on='user_id', how='left') [cite: 492, 496]
        user_data = user_data.fillna(0)  # 填充 NaN [cite: 495, 497]
        
        # 特征工程 [cite: 500]
        user_data['days_since_last_active'] = (datetime.now() - user_data['last_active']).dt.days [cite: 502, 503]
        user_data['days_since_last_purchase'] = (datetime.now() - user_data['last_purchase_date']).dt.____ [cite: 505, 506]
        user_data['days_since_last_purchase'] = user_data['days_since_last_purchase'].fillna(user_data['days_since_last_active']) [cite: 508, 509]
        
        # 定义流失标签 [cite: 517]
        user_data['churn'] = (user_data['days_since_last_active'] > ____).astype(int) [cite: 513]
        return user_data [cite: 514]
    except FileNotFoundError: [cite: 515]
        print("Required data files not found for ML analysis.") [cite: 527]
        return pd.DataFrame() [cite: 528]
    except Exception as e: [cite: 529]
        print(f"Error loading or processing user data for ML: {e}") [cite: 530]
        return pd.DataFrame() [cite: 531]

答案:____ ____ ____ (每空 1 分,共 3 分)

题目 6.2 在用户分群分析中,需要添加正确的聚类算法。请补充:

Python

def analyze_user_segmentation(self, n_clusters=4): [cite: 535]
    user_data = self._get_user_data() [cite: 537, 538]
    if user_data.empty: [cite: 540]
        return () [cite: 542]
        
    features = user_data[['total_views', 'total_clicks', 'total_purchases', 'total_spent', 'days_since_last_active']] [cite: 545, 546, 547]
    
    if self.scaler is None: [cite: 550]
        self.scaler = StandardScaler() [cite: 552]
        features_scaled = self.scaler.____(features) [cite: 554, 561]
    else: [cite: 556]
        features_scaled = self.scaler.transform(features) [cite: 558, 562]
        
    if self.user_segmentation_model is None: [cite: 563]
        self.user_segmentation_model = KMeans(n_clusters=n_clusters, random_state=____, n_init=10) [cite: 565, 566, 567]
        self.user_segmentation_model.____(features_scaled) [cite: 569, 570]
    else: [cite: 568]
        ____
        
    self._save_models() [cite: 572]
    
    user_data['segment'] = self.user_segmentation_model.____(features_scaled) [cite: 577]
    segment_analysis = user_data.groupby('segment').agg( [cite: 578]
        count=('user_id', 'count'), [cite: 580]
        avg_views=('total_views', 'mean'), [cite: 582, 583]
        avg_clicks=('total_clicks', 'mean'), [cite: 585]
        avg_purchases=('total_purchases', 'mean'), [cite: 587]
        avg_spent=('total_spent', 'mean'), [cite: 589]
        avg_days_inactive=('days_since_last_active', 'mean') [cite: 591]
    ).to_dict('index') [cite: 593]
    
    return user_data[['user_id', 'segment']].to_dict('records'), segment_analysis [cite: 597]

答案:____ ____ ____ ____ (每空 1 分,共 4 分)


任务七:模型训练

题目 7.1 在流失预测模型训练中,需要添加正确的数据分割。请补充:

Python

def train_churn_prediction_model(self): [cite: 603]
    user_data = self._get_user_data() [cite: 604]
    if user_data.empty or 'churn' not in user_data.columns or len(user_data['churn'].unique()) < 2: [cite: 605, 608]
        print("Not enough data or churn labels for training churn prediction model.") [cite: 614]
        return False [cite: 614]
        
    features = user_data[['total_views', 'total_clicks', 'total_purchases', 'total_spent', 'days_since_last_active', 'days_since_last_purchase']] [cite: 615, 616]
    labels = user_data['churn'] [cite: 618]
    
    X_train, X_test, y_train, y_test = train_test_split( [cite: 621]
        features, labels, test_size=____, random_state=____ [cite: 623, 624]
    ) [cite: 626]
    
    if self.scaler is None: [cite: 629]
        self.scaler = StandardScaler() [cite: 650]
        X_train_scaled = self.scaler.____(X_train) [cite: 632, 659]
        X_test_scaled = self.scaler.transform(X_test) [cite: 651]
    else: [cite: 635]
        X_train_scaled = self.scaler.transform(X_train) [cite: 637]
        X_test_scaled = self.scaler.transform(X_test) [cite: 652]
        
    self.churn_prediction_model = RandomForestClassifier( [cite: 653, 660]
        n_estimators=____, [cite: 642]
        random_state=42 [cite: 654]
    )
    self.churn_prediction_model.____(X_train_scaled, y_train) [cite: 655]
    y_pred = self.churn_prediction_model.____(X_test_scaled) [cite: 656]
    
    accuracy = accuracy_score(y_test, y_pred) [cite: 657]
    print(f"Churn prediction model trained with accuracy: {accuracy}") [cite: 658]
    self._save_models() [cite: 662]
    return True [cite: 664]

答案:____ ____ ____ ____ ____ ____ (每空 1 分,共 6 分)

题目 7.2 在商品推荐算法中,需要添加正确的推荐逻辑。请补充:

Python

def recommend_products(self, user_id: str, num_recommendations: int = 5): [cite: 668]
    user_data = self._get_user_data() [cite: 669]
    user_profile_row = user_data[user_data['user_id'] == user_id] [cite: 671]
    
    if user_profile_row.empty: [cite: 675]
        return {"user_id": user_id, "recommendations": [], "message": "User not found"} [cite: 677]
        
    try: [cite: 680]
        df_products = pd.read_csv(os.path.join(DATA_DIR, 'product_events.csv')) [cite: 684, 685]
    except FileNotFoundError: [cite: 687]
        print("Product data file not found for recommendations.") [cite: 690]
        return {"user_id": user_id, "recommendations": [], "message": "Product data not available"} [cite: 691, 692]
        
    favorite_categories = eval(user_profile_row['favorite_categories'].iloc[0]) [cite: 693]
    recommended_products = [] [cite: 694]
    for category in favorite_categories: [cite: 695]
        category_products = df_products[df_products['category'] == category].nlargest( [cite: 696, 698]
            num_recommendations, [cite: 699]
            '____'
        )
        recommended_products.extend(category_products['product_id'].tolist()) [cite: 700]
        
    # 如果推荐不足,用高评分商品补充 [cite: 727]
    if len(recommended_products) < num_recommendations: [cite: 728, 729]
        top_products = df_products.nlargest(num_recommendations, '____')['product_id'].tolist() [cite: 730, 718]
        for prod_id in top_products: [cite: 720]
            if prod_id not in recommended_products: [cite: 731]
                recommended_products.append(prod_id) [cite: 732]
            if len(recommended_products) >= num_recommendations: [cite: 733]
                break [cite: 734]
                
    return {"user_id": user_id, "recommendations": recommended_products[:num_recommendations]} [cite: 735]

答案:____ ____ (每空 1 分,共 2 分)


模块四:数据采集与实时计算 (20%)

任务八:数据采集

题目 8.1 在 Kafka 主题管理中,需要添加正确的主题创建逻辑。请在 start_project.sh 中补充:

Bash

# 检查 Kafka 主题是否存在,如果不存在则创建 [cite: 741]
KAFKA_TOPICS=("user-behavior" "order-events" "product-events" "user-profile") [cite: 742]

for TOPIC in "${KAFKA_TOPICS[@]}"; do [cite: 744]
    EXISTS=$(/home/zkpk/bigdata/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list | grep -w "$TOPIC") [cite: 746, 747]
    if [ -z "$EXISTS" ]; then [cite: 749]
        echo "创建 Kafka 主题: $TOPIC" [cite: 751]
        /home/zkpk/bigdata/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic "$TOPIC" --partitions ____ --replication-factor ____ [cite: 753, 754]
    else [cite: 756]
        echo "Kafka 主题 '$TOPIC' 已存在。" [cite: 758]
    fi
done [cite: 763]

答案:____ ____ (每空 1 分,共 2 分)

题目 8.2 在数据生成器启动中,需要添加正确的环境变量设置。请补充:

Bash

# 启动数据生成器 [cite: 767]
echo "启动用户行为数据生成器..." [cite: 768]
____ "$PROJECT_HOME/src/data_generator.py" > "$LOGS_DIR/data_generator.log" 2>&1 & [cite: 770, 771]
echo $! > "$PID_DIR/____" [cite: 773]

答案:____ ____ (每空 1 分,共 2 分)


任务九:数据处理

题目 9.1 在 Web 应用中,需要添加正确的 Kafka 消费者线程。请在 app.py 中补充:

Python

def consume_kafka_messages(): [cite: 781]
    global realtime_metrics
    consumer_behavior = KafkaConsumer( [cite: 782, 784]
        KAFKA_CONFIG['____'], [cite: 783]
        bootstrap_servers=KAFKA_CONFIG['bootstrap_servers'], [cite: 785]
        auto_offset_reset='____', [cite: 787]
        enable_auto_commit=____, [cite: 791]
        group_id='fastapi-behavior-consumer', [cite: 792]
        value_deserializer=lambda x: json.loads(x.decode('utf-8')) [cite: 793]
    )
    print("开始消费 Kafka 数据...") [cite: 797]
    
    for message in consumer_behavior.poll(timeout_ms=100).values(): [cite: 800]
        try: [cite: 803]
            for msg in message: [cite: 820]
                behavior = UserBehavior(**msg.value) [cite: 821]
                realtime_behaviors.append(behavior.model_dump()) [cite: 822]
                
                # 更新实时指标 [cite: 823]
                realtime_metrics["behavior_distribution"][behavior.event_type] += 1 [cite: 824]
                realtime_metrics["device_distribution"][behavior.device_type] += 1 [cite: 824]
                realtime_metrics["hourly_activity"][behavior.timestamp.hour] += 1 [cite: 824]
                
                if behavior.event_type == 'view': [cite: 825]
                    realtime_metrics["total_views"] += 1 [cite: 826]
                elif behavior.event_type == 'click': [cite: 827]
                    realtime_metrics["total_clicks"] += 1 [cite: 828]
                elif behavior.event_type == 'add_to_cart': [cite: 828]
                    realtime_metrics["total_add_to_cart"] += 1 [cite: 828]
                elif behavior.event_type == 'purchase': [cite: 828]
                    realtime_metrics["total_purchases"] += 1 [cite: 828]
        except Exception as e: [cite: 832]
            print(f"Error processing behavior message in FastAPI: {e}, Message: {msg.value}") [cite: 833, 834]

答案:____ ____ ____ (每空 1 分,共 3 分)

题目 9.2 在实时数据 API 中,需要添加正确的数据返回格式。请补充:

Python

@app.get("/api/realtime_metrics") [cite: 838]
async def get_realtime_metrics(): [cite: 840]
    # 确保小时活动覆盖所有 24 小时 [cite: 843]
    full_hourly_activity = ((h, realtime_metrics["hourly_activity"].get(h, 0)) for h in range(____)) [cite: 844, 845]
    
    return { [cite: 848]
        "total_views": realtime_metrics["____"], [cite: 850]
        "total_clicks": realtime_metrics["total_clicks"], [cite: 852]
        "total_add_to_cart": realtime_metrics["total_add_to_cart"], [cite: 854]
        "total_purchases": realtime_metrics["____"], [cite: 856]
        "total_orders": realtime_metrics["total_orders"], [cite: 858]
        "total_revenue": round(realtime_metrics["total_revenue"], 2), [cite: 860]
        "behavior_distribution": dict(realtime_metrics["behavior_distribution"]), [cite: 862]
        "category_distribution": dict(realtime_metrics["category_distribution"]), [cite: 864]
        "device_distribution": dict(realtime_metrics["device_distribution"]), [cite: 867]
        "hourly_activity": dict(sorted(____)) [cite: 868]
    }

答案:____ ____ ____ ____ (4 分)

题目 9.3 在机器学习分析 API 中,需要添加正确的用户分群接口。请补充:

Python

@app.get("/api/user_segmentation") [cite: 875]
async def get_user_segmentation(): [cite: 877]
    try: [cite: 879]
        user_segments, segment_summary = ml_analyzer.____() [cite: 881, 882]
        return {"user_segments": user_segments, "segment_summary": segment_summary} [cite: 887]
    except Exception as e: [cite: 885]
        raise HTTPException(status_code=500, detail=f"User segmentation failed: {e}") [cite: 888]

@app.get("/api/churn_prediction/{user_id}") [cite: 891]
async def get_churn_prediction(user_id: str): [cite: 893]
    try: [cite: 895]
        churn_risk = ml_analyzer.____(user_id) [cite: 897, 898]
        return churn_risk [cite: 900]
    except Exception as e: [cite: 902]
        raise HTTPException(status_code=500, detail=f"Churn prediction failed for user {user_id}: {e}") [cite: 904, 905]

@app.get("/api/product_recommendations/{user_id}") [cite: 909]
async def get_product_recommendations(user_id: str): [cite: 914]
    try: [cite: 916]
        recommendations = ml_analyzer.____(user_id) [cite: 910]
        return recommendations [cite: 911]
    except Exception as e: [cite: 912]
        raise HTTPException(status_code=500, detail=f"Product recommendations failed for user {user_id}: {e}") [cite: 924]

答案:____ ____ ____ (每空 1 分,共 3 分)

题目 9.4 在数据流处理中,需要添加正确的实时统计更新。请补充:

Python

def update_traffic_stats(data): [cite: 926]
    global realtime_metrics [cite: 927]
    ____
    behavior_type = data.get('behavior_type', "unknown") [cite: 943]
    realtime_metrics["behavior_distribution"][behavior_type] += 1 [cite: 944]
    
    # 更新设备分布统计 [cite: 945]
    device_type = data.get('device_type', "unknown") [cite: 946]
    realtime_metrics["device_distribution"][device_type] += 1 [cite: 947]
    
    # 更新小时活跃度统计 [cite: 940]
    timestamp = data.get('timestamp', datetime.now()) [cite: 948]
    if isinstance(timestamp, str): [cite: 950]
        timestamp = datetime.____(timestamp) [cite: 952, 953]
        
    hour = timestamp.____ [cite: 955, 956]
    realtime_metrics["hourly_activity"][hour] += 1 [cite: 958]
    
    # 更新总计数 [cite: 961]
    if behavior_type == 'view': [cite: 963]
        realtime_metrics["total_views"] += 1 [cite: 965]
    elif behavior_type == 'click': [cite: 967]
        realtime_metrics["total_clicks"] += 1 [cite: 969]
    elif behavior_type == 'add_to_cart': [cite: 971]
        realtime_metrics["total_add_to_cart"] += 1 [cite: 973]
    elif behavior_type == 'purchase': [cite: 978]
        realtime_metrics["total_purchases"] += 1 [cite: 979]
        realtime_metrics["total_orders"] += 1 [cite: 980]
        realtime_metrics["total_revenue"] += data.get('total_amount', 0) [cite: 981]

答案:____ ____ ____ (3 分)

题目 9.5 在数据持久化中,需要添加正确的数据保存逻辑。请补充:

Python

def save_data_periodically(self): [cite: 985]
    if self.behavior_data: [cite: 987]
        df_behavior = pd.DataFrame(self.behavior_data) [cite: 991]
        df_behavior['timestamp'] = pd.to_datetime(df_behavior['timestamp']) [cite: 992]
        df_behavior.to_csv( [cite: 992]
            os.path.join(DATA_DIR, 'user_behaviors.csv'), [cite: 992]
            mode=____, [cite: 994]
            header=not os.path.exists(os.path.join(DATA_DIR, 'user_behaviors.csv')), [cite: 996, 997]
            index=False [cite: 999]
        )
        self.behavior_data = []  # 清空数据 [cite: 1001]
        
    if self.order_data: [cite: 1004]
        df_orders = pd.DataFrame(self.order_data) [cite: 1006]
        df_orders['order_date'] = pd.to_datetime(df_orders['order_date']) [cite: 1009]
        df_orders.to_csv( [cite: 1010]
            os.path.join(DATA_DIR, 'order_events.csv'), [cite: 1010]
            mode=____, [cite: 1012]
            header=not os.path.exists(os.path.join(DATA_DIR, 'order_events.csv')), [cite: 1014, 1015]
            index=False [cite: 1017]
        )
        self.order_data = []  # 清空数据 [cite: 1024]
        
    if self.user_profile_data: [cite: 1025]
        df_user_profiles = pd.DataFrame(list(self.user_profile_data.values())) [cite: 1026]
        df_user_profiles['last_active'] = pd.to_datetime(df_user_profiles['last_active']) [cite: 1027, 1028]
        df_user_profiles.to_csv( [cite: 1029]
            os.path.join(DATA_DIR, 'user_profiles.csv'), [cite: 1029]
            mode=____, [cite: 1030]
            index=False [cite: 1031]
        )  # 覆盖保存最新状态 [cite: 1031]

答案:____ ____ ____ (每空 1 分,共 3 分)


模块五:数据可视化 (10%)

任务十:制作图表

题目 10.1 在前端 JavaScript 中,需要添加正确的图表初始化。请在 index.html 中补充:

JavaScript

// 初始化图表 [cite: 1037]
function initCharts() { [cite: 1038]
    charts.behaviorDistribution = echarts.init(document.getElementById('behaviorDistributionChart')); [cite: 1040, 1041]
    charts.deviceDistribution = echarts.____(document.getElementById('deviceDistributionChart')); [cite: 1045, 1046]
    charts.hourlyActivity = echarts.init(document.getElementById('hourlyActivityChart')); [cite: 1047, 1048]
    charts.userSegmentation = echarts.init(document.getElementById('userSegmentationChart')); [cite: 1049, 1050]
    
    // 响应式调整 [cite: 1051]
    window.addEventListener('resize', function() { [cite: 1052]
        Object.values(charts).forEach(chart => chart.____()); [cite: 1057]
    }); [cite: 1059]
}

答案:____ ____ (每空 1 分,共 2 分)

题目 10.2 在用户 behavior 分布图表中,需要添加正确的饼图配置。请补充:

JavaScript

// 更新用户行为分布图表 [cite: 1066]
function updateBehaviorChart(data) { [cite: 1067]
    const behaviorData = Object.entries(data.behavior_distribution).map(([name, value]) => ({ [cite: 1071, 1072]
        name: name === 'view' ? '预览' : [cite: 1073]
              name === 'click' ? '点击' : [cite: 1074]
              name === 'add_to_cart' ? '加购物车' : [cite: 1075]
              name === 'purchase' ? '购买' : [cite: 1076, 1077]
              name === 'favorite' ? '收藏' : '分享', [cite: 1078]
        value: ____
    })); [cite: 1089]
    
    charts.behaviorDistribution.setOption({ [cite: 1079]
        title: { text: '用户行为分布', left: 'center', show: false }, [cite: 1080]
        tooltip: { trigger: 'item' }, [cite: 1081]
        legend: { orient: 'vertical', left: 'left' }, [cite: 1082]
        series: [{ [cite: 1083]
            name: '行为类型', [cite: 1101]
            type: '____', [cite: 1102]
            radius: '____', [cite: 1103]
            data: behaviorData, [cite: 1104]
            emphasis: { [cite: 1105]
                itemStyle: { [cite: 1105]
                    shadowBlur: 10, [cite: 1105]
                    shadowOffsetX: 0, [cite: 1105]
                    shadowColor: 'rgba(0, 0, 0, 0.5)' [cite: 1106]
                } [cite: 1106]
            } [cite: 1106]
        }]
    }); [cite: 1109]
}

答案:____ ____ ____ (每空 1 分,共 3 分)

题目 10.3 在 24 小时活跃度趋势图表中,需要添加正确的时间序列配置。请补充:

JavaScript

// 更新 24 小时活跃度趋势图表 [cite: 1114]
function updateHourlyActivityChart(data) { [cite: 1115]
    const hours = Object.keys(data.hourly_activity).map(hour => `${hour}:00`); [cite: 1126]
    const activityData = Object.values(data.hourly_activity); [cite: 1127]
    
    charts.hourlyActivity.setOption({ [cite: 1128]
        title: { text: '24小时活跃度趋势', left: 'center', show: false }, [cite: 1129]
        tooltip: { trigger: 'axis' }, [cite: 1130]
        xAxis: { type: 'category', data: hours }, [cite: 1131]
        yAxis: { type: 'value', name: '活跃度' }, [cite: 1132]
        series: [{ [cite: 1133]
            name: '活跃度', [cite: 1134]
            type: '____', [cite: 1148]
            data: activityData, [cite: 1149]
            smooth: ____, [cite: 1150]
            itemStyle: { color: '#91cc75' } [cite: 1151]
        }]
    }); [cite: 1144]
}

答案:____ ____ (每空 1 分,共 2 分)


任务十一:分析数据

题目 11.1 在实时监控功能中,需要添加正确的定时更新逻辑。请补充:

JavaScript

// 开始实时监控 [cite: 1154]
function startRealTime() { [cite: 1155]
    if (realTimeInterval) clearInterval(realTimeInterval); [cite: 1157]
    
    realTimeInterval = setInterval(async () => { [cite: 1164]
        try { [cite: 1161]
            const response = await fetch('____'); [cite: 1163, 1165]
            const data = await response.json(); [cite: 1167]
            
            if (data.____ === 'success') { [cite: 1171]
                updateBehaviorChart(data); [cite: 1173]
                updateDeviceChart(data); [cite: 1175]
                updateHourlyActivityChart(data); [cite: 1177]
                
                // 更新指标显示 [cite: 1180]
                document.getElementById('totalViews').innerText = data.total_views; [cite: 1182, 1183]
                document.getElementById('totalClicks').innerText = data.total_clicks; [cite: 1185]
                document.getElementById('totalPurchases').innerText = data.total_purchases; [cite: 1187, 1188]
                document.getElementById('totalRevenue').innerText = data.total_revenue.toFixed(2); [cite: 1190, 1191]
                
                document.getElementById('status').textContent = '状态:实时监控中 - ' + new Date().toLocaleTimeString(); [cite: 1194, 1195]
            }
        } catch (error) { [cite: 1199]
            console.error('实时监控出错:', error); [cite: 1207]
        }
    }, ____); // 每 3 秒更新一次 [cite: 1208]
    
    document.getElementById('startBtn').disabled = true; [cite: 1209]
    document.getElementById('stopBtn').disabled = false; [cite: 1210]
    document.getElementById('status').textContent = '状态:实时监控已启动'; [cite: 1211]
}

答案:____ ____ ____ (每空 1 分,共 3 分)


模块六:综合分析任务 (5%)

任务十二:分析数据

题目 12.1 在系统健康检查中,需要添加正确的服务状态监控。请在 app.py 中补充:

Python

@app.get("/api/health") [cite: 1221]
async def health_check(): [cite: 1223]
    kafka_status = "healthy" [cite: 1227]
    try: [cite: 1229]
        # 尝试列出主题来检查 Kafka 连接 [cite: 1231]
        ____ [cite: 1233, 1234]
    except Exception: [cite: 1236]
        kafka_status = "unhealthy" [cite: 1238]
        
    ml_model_status = "healthy" if ml_analyzer.user_segmentation_model and ml_analyzer.churn_prediction_model else "uninitialized" [cite: 1241, 1242]
    
    return { [cite: 1245]
        "status": "ok", [cite: 1247]
        "kafka_status": kafka_status, [cite: 1249]
        "ml_models": ml_model_status, [cite: 1251]
        "timestamp": datetime.now() [cite: 1253]
    } [cite: 1255]

答案:____ (1 分)

题目 12.2 在数据分析 API 中,需要添加正确的统计计算。请补充:

Python

@app.get("/api/behavior/analysis") [cite: 1259]
async def get_behavior_analysis(): [cite: 1269]
    try: [cite: 1269]
        return { [cite: 1270]
            "status": "____", [cite: 1271]
            "data": { [cite: 1272]
                "behavior_distribution": realtime_metrics["behavior_distribution"], [cite: 1273]
                "device_distribution": realtime_metrics["behavior_distribution"], [cite: 1273]
                "hourly_activity": [ [cite: 1273]
                    {
                        "hour": f"{i}:00", [cite: 1274]
                        "activity": realtime_metrics["hourly_activity"].get(i, 0) [cite: 1275]
                    } for i in range(____) [cite: 1276]
                ]
            }
        } [cite: 1284]
    except Exception as e: [cite: 1285]
        raise HTTPException(status_code=500, detail=str(e)) [cite: 1286]

答案:____ ____ (每空 1 分,共 2 分)


任务十三:提出方案

题目 13.1 在项目启动脚本中,需要添加正确的服务启动顺序。请在 start_project.sh 中补充:

Bash

#!/bin/bash [cite: 1292]

PROJECT_HOME="/home/zkpk/projects/ecommerce-user-behavior" [cite: 1294]
PYTHON_ENV="/home/zkpk/Anaconda3/bin/python" [cite: 1296]
KAFKA_HOME="/home/zkpk/bigdata/kafka" [cite: 1297]

echo "启动电商用户行为分析平台..." [cite: 1299]

# 1. 检查大数据 environment 状态 [cite: 1301]
echo "检查大数据环境状态..." [cite: 1302]
/home/zkpk/bigdata/check-bigdata.sh [cite: 1303]
if [ $? -ne 0 ]; then [cite: 1304]
    echo "大数据环境检查失败,请确保所有组件已正确启动。" [cite: 1309]
    exit ____ [cite: 1310]
fi [cite: 1307]

# 2. 检查并创建 Kafka 主题 [cite: 1311]
echo "检查 Kafka 主题..." [cite: 1312]
export PATH=$PATH:$KAFKA_HOME/bin [cite: 1313]
____

# 3. 启动数据生成器 [cite: 1315]
echo "启动用户行为数据生成器..." [cite: 1316]
nohup $PYTHON_ENV $PROJECT_HOME/src/data_generator.py > $PROJECT_HOME/logs/data_generator.log 2>&1 & [cite: 1317, 1318]
echo $! > $PROJECT_HOME/pids/data_generator.pid [cite: 1319]
sleep 2 [cite: 1321]

# 4. 启动数据处理器 [cite: 1324]
echo "启动数据处理器..." [cite: 1325]
nohup $PYTHON_ENV $PROJECT_HOME/src/data_processor.py > $PROJECT_HOME/logs/data_processor.log 2>&1 & [cite: 1326, 1327]
echo $! > $PROJECT_HOME/pids/data_processor.pid [cite: 1328]
sleep 2 [cite: 1329]

# 5. 启动 Web 服务 [cite: 1331]
echo "启动 Web 服务..." [cite: 1332]
export PYTHONPATH=$PROJECT_HOME:$PYTHONPATH [cite: 1333]
nohup $PYTHON_ENV -m uvicorn web.app:app --host 0.0.0.0 --port 8080 > $PROJECT_HOME/logs/web_service.log 2>&1 & [cite: 1334, 1335]
echo $! > $PROJECT_HOME/pids/web_service.pid [cite: 1336]
sleep 5 [cite: 1337]

echo "平台启动完成!" [cite: 1338]
echo "Web 界面: http://localhost:8080" [cite: 1341]
echo "使用 $PROJECT_HOME/stop_project.sh 停止所有服务" [cite: 1342]

答案:____ ____ (每空 1 分,共 2 分)


睿抗CAIP强脑赛道
分享本文到:

上一篇
【班级管理实务】精简笔记
下一篇
2024 睿抗 CAIP 大数据应用开发赛项决赛样题