From a321cc4c3ffb2ba4b6741ef3bab534157a5fae50 Mon Sep 17 00:00:00 2001 From: Zihan Xiao <98075925+zihanxiao23@users.noreply.github.com> Date: Mon, 9 Dec 2024 22:59:05 +0000 Subject: [PATCH] debug --- src/app.py | 19 +++++++++++++++---- src/load_test.py | 6 ++++-- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/src/app.py b/src/app.py index 316d068..bbcb6aa 100644 --- a/src/app.py +++ b/src/app.py @@ -2,12 +2,16 @@ from pydantic import BaseModel from pyspark.sql import SparkSession from pyspark.sql.functions import col, avg, count +from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType # Initialize FastAPI app = FastAPI() # Initialize SparkSession -spark = SparkSession.builder.appName("microservice").getOrCreate() +spark = SparkSession.builder \ + .appName("microservice") \ + .master("local[*]") \ + .getOrCreate() # Reduce Spark log verbosity spark.sparkContext.setLogLevel("WARN") @@ -18,14 +22,21 @@ class Data(BaseModel): gender: str salary: float +# Define schema for Spark DataFrame +schema = StructType([ + StructField("id", IntegerType(), True), + StructField("gender", StringType(), True), + StructField("salary", FloatType(), True) +]) + @app.post("/process") async def process_data(data: list[Data]): """ Receive JSON data stream and return analysis results """ try: - # Convert input data to Spark DataFrame - df = spark.createDataFrame([d.dict() for d in data]) + # Convert input data to Spark DataFrame with schema + df = spark.createDataFrame([d.dict() for d in data], schema=schema) # Perform simple data analysis result = df.groupBy("gender").agg( avg("salary").alias("average_salary"), @@ -35,4 +46,4 @@ async def process_data(data: list[Data]): result_json = [{"gender": row["gender"], "average_salary": row["average_salary"], "count": row["count"]} for row in result] return result_json except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) + raise HTTPException(status_code=500, detail=str(e)) \ No newline at end of file diff --git a/src/load_test.py b/src/load_test.py index 0191bcb..3a18d7b 100644 --- a/src/load_test.py +++ b/src/load_test.py @@ -1,9 +1,11 @@ from locust import HttpUser, task, between +import random class LoadTest(HttpUser): wait_time = between(1, 2) @task def process_data(self): - data = [{"id": 1, "gender": "M", "salary": 5000}] - self.client.post("/process", json=data, timeout=10) + # Generate a random batch of data + data = [{"id": i, "gender": random.choice(["M", "F"]), "salary": random.uniform(3000, 10000)} for i in range(100)] + self.client.post("/process", json=data, timeout=20) \ No newline at end of file