访谈的应用的并发测试

目的

我们有一个访谈的应用,上线前需要进行并发测试,查看并发是否满足大量用户同时进行访谈的需求。

步骤

整个访谈的流程包括,AI发起问题,用户回答问题,然后这样的循环,直到AI提出访谈结束。我们不检测用户回答问题的耗时,只检测AI收到用户问题后进行回答的时间。这个AI回答时间包括,收集数据库必要信息时间,调用LLM回答时间,存储数据时间。这期间进行了多次数据查询和至少1次LLM询问,多次存储时间,为了衡量哪些地方耗时较长,我们需要对每个步骤进行最小粒度的耗时检测,需要在程序的对应位置加上断点。

并发测试程序

我们写了1个类,去并发测试,首先我们的服务器端是通过nginx进行负载均衡,后面有2个相同的服务器,使用IP轮询平衡策略。

我们的主程序是performance_main,测试前为了防止mysql的线程池超标,我们需要实现调整mysql线程池,初始化不同的LLM模型,获取已有的数字人,然后创建和并发相同数量的线程,每个线程对应1个数字人,进行访谈,然后访谈结果查询和导出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
class ConcurrencyInterview():
def __init__(self, host="127.0.0.1", port=6108, concurrent_num=100):
"""
Args:
host (): idc
port ():26109
concurrent_num (): 并发测试的数量
"""
self.host = host
self.port = port
self.concurrent_num = concurrent_num # Number of concurrent requests
self.inital_api_url(host, port)
def inital_api_url(self, host, port):
self.api_url = f'http://{host}:{port}'
self.ping_server()

def ping_server(self):
"""
curl http://111.207.114.228:17087/ping
测试Ping接口
:return:
"""
def get_robot_roles(self):
"""
获取用户设定的最终访谈设定
:return:
"msg": "success",
"data": [
{
"id": 11,
"name": "Hudson",
"attributes": []
},
{
"id": 12,
"name": "Cindy",
"attributes": []
}
]
"""


def get_llm_configs(self,host,port):
"""
获取所有llm的配置信息
:return:
"""

def robot_interview(self, messages, robot_id, project_id, study_id, session_id, language, save_records,
usecache=True):
"""
开始进行访谈,获取第1个问题
:return:


def loop_one_robot_interview(self, robot_name, robot_id, project_id, study_id, session_id, language="Chinese"):
"""
开始进行访谈,获取第1个问题
:return:
"""

def clean_session_record(self, session_id, db_online, db_manager):
"""

Args:
session_id ():
db_online: 一些在线库
db_manager: 一些管理库
Returns:
"""
query_params={"session_id": session_id})
def clean_concurrency_record(self, db_online, db_manager):
"""

Args:
最终清理所有的concurrency开头的session_id的数据
Returns:
"""
# 清空所有concurrency开头的session_id的数据
# 查询后清理

def performance_main(self, project_id, study_id, pool=50, llm=["glm","glm"], concurrent_num=None):
"""
Args:
project_id ():
study_id ():
concurrent_num ():
use_yield_return (): 是否使用生成器模式返回数据
Returns:
"""


def performance_prepare(self, pool=50, llm=["glm","glm"], sql_host="onlinedev"):
"""
检查配置信息,
Returns:
"""

def performance_end(self, configs):
"""
恢复配置信息
Returns:
configs: 来自self.source_config
"""


def gr_server_config_change(self):
"""
修改gr_server的配置信息
Returns:
"""
# 先改小线上的进程池,防止mysql超负荷


def reinitial_instance(self,host, port, pool=50, llm="glm", sql_host="onlinedev"):
"""
使用线上库的,先改成测试库,然后进行并发测试,测试完成后在改成线上库
重新初始化interview的实例
:return:
"""



def export_data_to_local(self,save_dir="Server_pool_50_user_100"):
"""
导出mysql数据到本地,用于这次测试的结果的记录
Returns:
"""


绘图程序

为了更好的绘图,我们需要整理数据,包括整理导出的并发测试数据和实时进行线上按项目查询时的数据统计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
class ConcurrencyPlot():
def __init__(self):
data_dir = "/Users/admin/Documents/lavector/访谈/performance"
def prepare_data(self):
"""
读取数据和预处理
包含数据文件TempRecording.xlsx
interview_record_detail.xlsx
interview_record.xlsx
TempUserRecords.xlsx
Returns:


def process_data_to_plot_data(self,TempUserRecords,interview_record,interview_record_detail, start_session_with_concurrency=True):
"""
处理数据成绘图格式的数据
Returns:
对应不同表的数据
TempUserRecords
interview_record
interview_record_detail
"""
one_data = {}
# 处理数据, 首先读取interview_record文件,过滤出session_id是concurrency开头的,这些是测试的记录
interview_record_concurrency = []
for one_record_data in interview_record:
session_id = one_record_data["session_id"]
if start_session_with_concurrency:
if isinstance(session_id, str) and session_id.startswith("concurrency"):
one_data_copy = copy.deepcopy(one_record_data)
interview_record_concurrency.append(one_data_copy)
else:
if isinstance(session_id, str): # 不加过滤的情况
one_data_copy = copy.deepcopy(one_record_data)
interview_record_concurrency.append(one_data_copy)
# session_id的格式是上面测试的测试时写入的格式, session_id = f"concurrency_{user}_{project_id}_{study_id}_{used_llm}"
if len(interview_record_concurrency) == 0:
return False, f"注意,数据interview_record中不包含测试concurrency开头的用户数据"
# 检查interview_record_concurrency中是否只有一个study_id,如果有多个,说明数据有问题
interview_record_concurrency_study_id = set([r["study_id"] for r in interview_record_concurrency])
if len(interview_record_concurrency_study_id) != 1:
return False, "注意,数据interview_record中包含多个study_id,请检查是否是同一个项目"
# interview_record_concurrency变成列表格式
for r in interview_record_concurrency:
session_id = r["session_id"]
if start_session_with_concurrency:
_, user, project_id, study_id, used_llm = session_id.split("_")
r["used_llm"] = used_llm
r["user"] = user
r["project_id"] = project_id
else:
r["used_llm"] = "unknown"
r["user"] = session_id
r["project_id"] = "unknown"
interview_record_concurrency_project_id = set([r["project_id"] for r in interview_record_concurrency])
if len(interview_record_concurrency_project_id) != 1:
return False,"注意,数据interview_record中包含多个project_id,请检查是否是同一个项目"
one_data["project_id"] = interview_record_concurrency_project_id.pop()
one_data["study_id"] = interview_record_concurrency_study_id.pop()
for r in interview_record_concurrency:
session_id = r["session_id"]
if r["status"] != 2: # 访谈没有结束,那么不用计算耗时
continue
# 每个用户的耗时, TempRecording中的start_time和end_time和这个时间时一致的
start_time = r["start_time"]
# 变成字符串
r["start_time"] = start_time.strftime("%Y-%m-%d %H:%M:%S")
end_time = r["end_time"]
# 变成字符串
r["end_time"] = end_time.strftime("%Y-%m-%d %H:%M:%S")
# 将时间字符串转换为datetime对象,然后计算差值
# start_time = datetime.strptime(start_time_str, "%Y-%m-%d %H:%M:%S")
# end_time = datetime.strptime(end_time_str, "%Y-%m-%d %H:%M:%S")
time_diff = end_time - start_time
# 将时间差转换为秒数
time_diff_seconds = time_diff.total_seconds()
r["cost_time"] = time_diff_seconds # 这个用户的整个访谈的耗时
# 计算每道题的耗时,
# 根据record_id,查询这个用户的访谈记录
record_id = r["record_id"]
interview_record_detail_one_user = [d for d in interview_record_detail if d["record_id"] == record_id]
if len(interview_record_detail_one_user) == 0:
logging.warning(f"注意,{session_id}对应的访谈记录没有在interview_record_detail中找到,请检查表interview_record_detail")
continue
# return False, f"注意,{session_id}对应的访谈记录没有在interview_record_detail中找到,请检查表interview_record_detail"
r["question_num"] = len(interview_record_detail_one_user) # 问答数量
# 每个问题的平均耗时,首先对interview_record_detail_one_user进行按id排序
interview_record_detail_one_user.sort(key=lambda x: x["id"])
# 过滤出TempUserRecords中的记录
TempUserRecords_one_user = [d for d in TempUserRecords if d["user_id"] == session_id]
# 必须仅有一条
if len(TempUserRecords_one_user) != 1:
logging.warning(
f"注意,数据TempUserRecords中包含0个或多个{session_id}对应的记录,请检查是否是同一个session_id回答了多次,现有数量:{len(TempUserRecords_one_user)}")
continue
# 获取QA中的内容
QA = TempUserRecords_one_user[0]["QA"] # 这个用户的所有问答记录
QA_list = json.loads(QA)
# 然后计算每个问题的耗时,分为用户耗时和AI回答耗时,还有总的耗时
User_QA_Pairs = []
for qa_info in QA_list:
QA_Pairs = qa_info.get("QA")
if not QA_Pairs:
continue
ai_step_cost_time = qa_info["ai_step_cost_time"]
# 好像有点问题,最后一次回答的end时,会多存储1次ai_step_cost_time中的数字
if len(QA_Pairs) != len(ai_step_cost_time):
logging.info(f"QA_Pairs的数量和ai_step_cost_time的数量不一致,问答对的数量应该和ai的回答计时数量一致,应该是最后一次回答时添加的")
# 去掉最后一个
ai_step_cost_time = ai_step_cost_time[:-1]
if len(QA_Pairs) != len(ai_step_cost_time):
return False, f"数据{session_id}中的ai_step_cost_time和QA_Pairs去掉最后一个后,仍然不一致,请检查"
for qa_pair, step_cost_time in zip(QA_Pairs, ai_step_cost_time): #step_cost_time代表访谈人生成的时间
ai_cost_time = 0
for one_message in qa_pair:
if one_message["role"] == "assistant":
# AI回答的耗时
ai_cost_time = one_message.get("cost_time") # ai回答耗时
if ai_cost_time is None:
logging.warning(f"注意,跳过这条记录,{session_id}对应的访谈记录中,AI回答的耗时为空,请检查,应该是线上没有这个记录")
ai_cost_time = 0
ai_response_length = len(one_message["content"]) # 文本的长度
one_message["response_length"] = ai_response_length
one_message["step_cost_time"] = step_cost_time # 只有ai访谈主持人有这个时间
if one_message["role"] == "user":
# 计算用户回答的耗时,用户只有一个在几点回答完成的时间answer_time
answer_time_str = one_message["answer_time"] # 用户回答完成的时间
# 需要计算出用户的耗时
answer_time = datetime.strptime(answer_time_str, "%Y-%m-%d %H:%M:%S")
# 用户答完时间减去访谈开始时间,就是总的这个问答对的耗时,在减去助手的回答时间,就是用户的回答时间了
qa_pair_cost_time = answer_time - start_time
human_cost_time = qa_pair_cost_time.total_seconds() - ai_cost_time # 用户回答的耗时
one_message["cost_time"] = human_cost_time
one_message["response_length"] = len(one_message["content"])
User_QA_Pairs.extend(QA_Pairs) # 这个用户的所有回答记录
r["QApairs"] = User_QA_Pairs
# print(r) #已经更新了interview_record_concurrency
one_data["interview_records"] = interview_record_concurrency
if not isinstance(one_data_copy["start_time"], str):
# 转换下时间
one_data["start_time"] = one_data_copy["start_time"].strftime("%Y-%m-%d %H:%M:%S")
one_data["end_time"] = one_data_copy["end_time"].strftime("%Y-%m-%d %H:%M:%S")
else:
one_data["start_time"] = one_data_copy["start_time"]
one_data["end_time"] = one_data_copy["end_time"]
return True, one_data

def prepare_mysql_plot_data(self, study_id, mysql_host="onlinedev"):
"""
准备项目的id和名称
Args:
study_id (): int
Returns:
"""

前端展示

D3js绘图的核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
function proecessViolinData(data) {
//为drawLLMQuestionStepCostTimeViolinChart处理数据
// 聚合数据
const stepCostTimeData = [];

data.interview_records.forEach(record => {
if (record.QApairs) {
record.QApairs.forEach(qaPair => {
qaPair.forEach(qa => {
if (qa.step_cost_time && Array.isArray(qa.step_cost_time)) {
qa.step_cost_time.forEach(step => {
const existingStep = stepCostTimeData.find(s => s.step === step.msg);
if (existingStep) {
existingStep.cost.push(step.cost);
} else {
stepCostTimeData.push({
step: step.msg,
cost: [step.cost]
});
}
});
}
});
});
}
});
console.log(`共收集到数据记录: ${stepCostTimeData.length}`)
return stepCostTimeData
}

//每次访谈人问问题时的耗时,每一步的耗时,包括各种查询和LLM的耗时
function drawLLMQuestionStepCostTimeViolinChart(source_data) {
const data = proecessViolinData(source_data);
const div_id = "llm_question_step_cost_time_chart"
const div_id_shark = `#${div_id}`
const chartElement = document.getElementById(div_id);
const margin = { top: 60, right: 30, bottom: 60, left: 40 }; // 增加上边距以容纳标题
const width = chartElement.offsetWidth - margin.left - margin.right;
const height = chartElement.offsetHeight - margin.top - margin.bottom;

// 创建SVG容器
d3.select(div_id_shark).selectAll('*').remove(); //先清空,然后绘图
const svg = d3.select(div_id_shark).append("svg")
.attr('width', width + margin.left + margin.right)
.attr('height', height + margin.top + margin.bottom)
.append('g')
.attr('transform', `translate(${margin.left},${margin.top})`)

// 创建X轴
const x = d3
.scaleBand()
.domain(data.map(d => d.step))
.range([0, width])
.padding(1)

svg
.append('g')
.attr('transform', `translate(0, ${height})`)
.call(d3.axisBottom(x))

// 创建Y轴
const maxCost = d3.max(data, d => d3.max(d.cost))
const y = d3.scaleLinear().domain([0, maxCost]).range([height, 0])
svg.append('g').call(d3.axisLeft(y))

// 计算每个step的密度
//不同的 ticks 值对图形的影响:
// 较小的值(例如ticks(20)):会生成较少的刻度点,图形的细节较少,曲线可能会变得比较粗糙。
// 较大的值(例如ticks(60)):会生成更多的刻度点,图形的细节更多,曲线更加平滑,但计算可能会稍慢。
const kde = kernelDensityEstimator(
kernelEpanechnikov(2), // 核函数,为何选择7:带宽参数影响平滑程度。值越大,平滑程度越高(图形更宽、更平滑),但细节会减少。值越小,图形更尖锐,但噪声会增加。7是一个经验性的值,你可以根据数据情况调整它。
y.ticks(60) // Y轴分布的范围, ticks的参数表示你希望在Y轴上生成多少个刻度点,通常用于核密度估计的计算。你可以根据数据分布和图表大小来调整这个值。
)

const allDensity = data.map(d => {
const density = kde(d.cost)
return { step: d.step, density }
})

// 绘制小提琴
svg
.selectAll('violin')
.data(allDensity)
.enter()
.append('path')
.attr('transform', d => `translate(${x(d.step)},0)`)
.datum(d => d.density)
.style('stroke', 'none')
.style('fill', '#69b3a2')
.attr('d', d3
.area()
.x0(d => -d[1] * 10)
.x1(d => d[1] * 10)
.y(d => y(d[0]))
.curve(d3.curveBasis)
)
// 添加标题
svg.append('text')
.attr('x', width / 2)
.attr('y', -margin.top / 2)
.attr('text-anchor', 'middle')
.style('font-size', '16px')
.style('font-weight', 'bold')
.text('访谈LLM每步耗时统计(小提琴图)');

// 添加 x 轴说明
svg.append('text')
.attr('x', width / 2)
.attr('y', height + margin.bottom / 2)
.attr('text-anchor', 'middle')
.text('步骤');

// 添加 y 轴说明
svg.append('text')
.attr('transform', 'rotate(-90)')
.attr('y', -margin.left / 1.5)
.attr('x', -height / 2)
.attr('text-anchor', 'middle')
.text('耗时(秒)');
}

function proecessScatterData(rawData) {
//为drawLLMQuestionStepCostTimeViolinChart处理数据
// 聚合数据
// 准备转换的数据结构
const dataMap = {};

// 遍历interview_records
rawData.interview_records.forEach((record) => {
if (record.QApairs) {
record.QApairs.forEach((qaPair) => {
const assistant = qaPair[0];
qaPair.forEach(qa => {
if (qa.step_cost_time && Array.isArray(qa.step_cost_time)) {
qa.step_cost_time.forEach((step) => {
const { msg, cost, total } = step;
// 如果step不在dataMap中,初始化它
if (!dataMap[msg]) {
dataMap[msg] = { step: msg, total_cost: [], cost_y: [], session_id: [] };
}
// 添加对应的total和cost
dataMap[msg].total_cost.push(total);
dataMap[msg].cost_y.push(cost);
dataMap[msg].session_id.push(record.session_id);
});
}
});
});
}
});

// 将结果转为数组
const result = Object.values(dataMap);
console.log(`共收集到数据记录: ${result.length}`)
return result
}

//每次访谈人问问题时的耗时,散点图
function drawLLMQuestionStepCostTimeScatterChart(source_data) {
const data = proecessScatterData(source_data);
const div_id = "llm_question_step_cost_time_chart_scatter"
const div_id_shark = `#${div_id}`
const chartElement = document.getElementById(div_id);
const margin = { top: 60, right: 30, bottom: 60, left: 40 }; // 增加上边距以容纳标题
const width = chartElement.offsetWidth - margin.left - margin.right;
const height = chartElement.offsetHeight - margin.top - margin.bottom;

// 创建SVG容器
d3.select(div_id_shark).selectAll('*').remove(); //先清空,然后绘图
const svg = d3.select(div_id_shark).append("svg")
.attr('width', width + margin.left + margin.right)
.attr('height', height + margin.top + margin.bottom)
.append('g')
.attr('transform', `translate(${margin.left},${margin.top})`);
// 设置坐标轴范围
const xMax = d3.max(data, d => d3.max(d.total_cost));
const yMax = d3.max(data, d => d3.max(d.cost_y));

const xScale = d3.scaleLinear()
.domain([0, xMax])
.range([0, width]);

const yScale = d3.scaleLinear()
.domain([0, yMax])
.range([height, 0]);

// 绘制x坐标轴
svg.append("g")
.attr('transform', `translate(0, ${height})`)
.call(d3.axisBottom(xScale));

svg.append("g")
// // .attr("transform", "translate(40,0)")
.call(d3.axisLeft(yScale));

// 绘制散点
data.forEach((d, index) => {
svg.selectAll(`.dot-${index}`)
.data(d.total_cost.map((cost, i) => ({ x: cost, y: d.cost_y[i], step: d.step, sid: d.session_id[i] })))
.enter()
.append("circle")
.attr("class", `dot-${index}`)
.attr("cx", d => xScale(d.x))
.attr("cy", d => yScale(d.y))
.attr("r", 2)
.attr("fill", colorScale(d.step))
.on("mouseover", function (event, d) {
// 创建 tooltip 并更新内容和位置
const tooltip = d3.select('#tooltip')
tooltip.style('opacity', 1)
.html(`Session_id: ${d.sid}<br>步骤: ${d.step}<br>总耗时: ${d.x}<br>耗时: ${d.y}`)
.style('left', (event.pageX + 5) + 'px')
.style('top', (event.pageY - 28) + 'px');

d3.select(this).attr('fill', 'orange'); // 鼠标悬停时改变颜色
})
.on("mousemove", function (event) {
// 更新 tooltip 位置
const tooltip = d3.select('#tooltip')
tooltip.style('left', (event.pageX + 5) + 'px')
.style('top', (event.pageY - 28) + 'px');
})
.on("mouseout", function () {
const tooltip = d3.select('#tooltip')
tooltip.style('opacity', 0); // 鼠标移出时隐藏 tooltip
d3.select(this).attr('fill', colorScale(d.step)); // 恢复原始颜色
});
});
// 添加标题
svg.append('text')
.attr('x', width / 2)
.attr('y', -margin.top / 2)
.attr('text-anchor', 'middle')
.style('font-size', '16px')
.style('font-weight', 'bold')
.text('访谈LLM每步耗时统计散点图');

// 添加 x 轴说明
svg.append('text')
.attr('x', width / 2)
.attr('y', height + margin.bottom / 2)
.attr('text-anchor', 'middle')
.text('加上前面的步骤后的总耗时');

// 添加 y 轴说明
svg.append('text')
.attr('transform', 'rotate(-90)')
.attr('y', -margin.left / 1.5)
.attr('x', -height / 2)
.attr('text-anchor', 'middle')
.text('耗时(秒)');
}

// 核密度估计函数
// 一个核密度估计(Kernel Density Estimation, KDE),用来生成小提琴图中的平滑曲线。核密度估计是一种估算数据分布的方法,通过核函数将离散的数据平滑化,得到连续的概率密度分布。
function kernelDensityEstimator(kernel, X) {
return function (V) {
return X.map(x => [x, d3.mean(V, v => kernel(x - v))])
}
}

//kernel: 核函数,用于对数据进行平滑处理。它会根据样本的距离来分配权重,越接近某个数据点的值,分配的权重越高。
function kernelEpanechnikov(k) {
return function (v) {
return Math.abs(v /= k) <= 1 ? 0.75 * (1 - v * v) / k : 0
}
}

改进

为了访谈过程时长的优化,我们从4个方面进行优化,对数据库进行了优化,部署后端服务和数据库更近,(因为测试发现,数据库的各种操作耗时较长由于网络的稳定性问题),然后减少数据库的查询和写入次数,所有中间状态都在前后端中传递,不进行中间保存。对LLM的使用进行优化,LLM的耗时不稳定,设置最大耗时时长,如果超时的话,使用outline中对应的内容作为下一个问题。


访谈的应用的并发测试
https://johnson7788.github.io/2024/10/21/%E8%AE%BF%E8%B0%88%E7%9A%84%E5%BA%94%E7%94%A8%E7%9A%84%E5%B9%B6%E5%8F%91%E6%B5%8B%E8%AF%95/
作者
Johnson
发布于
2024年10月21日
许可协议