Python实现的HTTP并发测试完整示例
(编辑:jimmy 日期: 2025/1/15 浏览:3 次 )
可修改变量thread_count指定最大的并发数量,即线程的数量。
完成之后,打印输出失败的次数,以及开始时间和结束时间,单位是毫秒。
主要是学习一下Python,仅供参考。
#!/usr/bin/python3 import sys, time, json, _thread import http.client, urllib.parse thread_count = 100 #并发数量 now_count = 0 error_count = 0 begin_time = '' lock_obj = _thread.allocate() def test_http_engine(): global now_count global error_count global thread_count global begin_time conn = None if now_count == 0: begin_time = int(round(time.time() * 1000)) try: conn = http.client.HTTPConnection("192.168.1.1", 80) conn.request('GET', '/') response = conn.getresponse() data = response.read() print (data) if json.dumps(response.status) != '200': error_count += 1; print ('error count: ' + str(error_count)) sys.stdout.flush() now_count += 1 if now_count == thread_count: print ('### error count: ' + str(error_count) + ' ###') print ('### begin time : ' + str(begin_time)) print ('### end time : ' + str(int(round(time.time() * 1000)))) except Exception as e: print (e) finally: if conn: conn.close() def test_thread_func(): global now_count global lock_obj cnt = 0 lock_obj.acquire() print ('') print ('=== Request: ' + str(now_count) + ' ===') cnt += 1 test_http_engine() sys.stdout.flush() lock_obj.release() def test_main(): global thread_count for i in range(thread_count): _thread.start_new_thread(test_thread_func, ()) if __name__=='__main__': test_main() while True: time.sleep(5)
下一篇:python基于Tkinter库实现简单文本编辑器实例