1.服务端
主程序:
# encoding: utf-8
import threading
from procedure.socket_server import ThreadingHttpServer, MainHandler
from procedure.process import mq_respond_procedure
from logger import log,logwf
from conf import (HOST,
PORT,
MQ_A_RECV_1,
MQ_A_RECV_2,
MQ_A_RECV_3,
MQ_A_RECV_4,
MQ_B_RECV_1,
MQ_B_RECV_2,
MQ_B_RECV_3,
MQ_B_RECV_4)
def start_server():
log.info("大额挡板工具启动成功")
print("大额挡板工具启动成功")
threads = []
server = ThreadingHttpServer((HOST, PORT), MainHandler)
server_thread = threading.Thread(target=server.serve_forever)
threads.append(server_thread)
mq_a1_thread = threading.Thread(target=mq_respond_procedure, args=(MQ_A_RECV_1, ))
mq_a2_thread = threading.Thread(target=mq_respond_procedure, args=(MQ_A_RECV_2, ))
mq_a3_thread = threading.Thread(target=mq_respond_procedure, args=(MQ_A_RECV_3, ))
mq_a4_thread = threading.Thread(target=mq_respond_procedure, args=(MQ_A_RECV_4, ))
mq_b1_thread = threading.Thread(target=mq_respond_procedure, args=(MQ_B_RECV_1, ))
mq_b2_thread = threading.Thread(target=mq_respond_procedure, args=(MQ_B_RECV_2, ))
mq_b3_thread = threading.Thread(target=mq_respond_procedure, args=(MQ_B_RECV_3, ))
mq_b4_thread = threading.Thread(target=mq_respond_procedure, args=(MQ_B_RECV_4, ))
threads.extend([mq_a1_thread, mq_a2_thread, mq_a3_thread, mq_a4_thread, mq_b1_thread, mq_b2_thread, mq_b3_thread, mq_b4_thread])
for thread in threads:
thread.start()
for thread in threads:
thread.join()
if __name__ == "__main__":
start_server()
socket服务端:
encoding: utf-8
from http.server import BaseHTTPRequestHandler
from socketserver import ThreadingMixIn, TCPServer
import threading
from logger import log
from conf import HOST, PORT
from procedure.process import send_total_procedure
log_id = 10000000
id_lock = threading.Lock()
class ThreadingHttpServer(ThreadingMixIn, TCPServer):
pass
class MainHandler(BaseHTTPRequestHandler):
# http服务端请求处理类
def do_POST(self):
'''处理post请求'''
content_len = int(self.headers['content-length']) # <--- Gets the size of data
mq_addr_list = eval(self.headers['mq-addr'])
body = self.rfile.read(content_len)
# print(filename_body_dic)
id_lock.acquire()
global log_id
log_id += 1
tag_id = log_id
id_lock.release()
filename_body_dic = eval(body.decode("utf-8"))
log.debug("[" + str(tag_id) + "]" + "收到客户端" + str(self.client_address) + "发来的报文:" + str(filename_body_dic))
res = send_total_procedure(filename_body_dic, mq_addr_list, tag_id) # 报文校验过程
if res:
res = str(res).encode("utf-8")
else:
res = str(list(filename_body_dic.keys())[0] + " 发送成功").encode("utf-8")
self.send_response(200)
self.send_header("Status", 200)
self.end_headers()
self.wfile.write(res)
if __name__ == "__main__":
server = ThreadingHttpServer((HOST, PORT), MainHandler)
t = threading.Thread(target=server.serve_forever)
t.start()
t.join()
2.客户端
import os
import time
import http.client
import re
def read_config():
'''
读取服务端的ip地址和端口
:return: ip地址, 端口
'''
with open("./config.txt", "r", encoding="utf-8") as f:
data = f.readlines()
host_ip = ""
port_num = 0
send_file_list = []
msg_line = data.index("--- 报文配置 ---\n")
for index in range(len(data)):
if re.search(r"HOST_IP \=(.+)", data[index]):
host_ip = re.search(r"HOST_IP \=(.+)", data[index]).group(1).strip()
elif re.search(r"PORT \=(.+)", data[index]):
port_num = int(re.search(r"PORT \=(.+)", data[index]).group(1).strip())
if index > msg_line:
if data[index]:
send_file_list.append(data[index])
return host_ip, port_num, send_file_list
def read_file(filename):
'''
读取message文件夹下的所有文件中的内容,把每个文件中的内容放入到列表中
:filename: 文件名
:return: 列表
'''
file_list = []
for file in os.listdir('./message'):
file_list.append(file)
if filename in file_list:
with open(os.path.join('./message', filename), 'rb') as f:
return {filename: f.read()} # {文件名: 文件内容}
else:
return None
def send_process(host_ip, port_num, mq_addr_list, filename):
'''
:param host_ip: 服务端的ip地址
:param port_num: 服务端的端口号
:param mq_addr_list: 连接mq的ip地址和端口的列表
:return: 若发送成功,则服务端返回发送成功。若发送失败,则服务端返回发送失败。
'''
msg = read_file(filename)
if not msg:
print("没有该文件:", filename)
return
headers = {"mq-addr": str(mq_addr_list)}
print("开始链接")
conn = http.client.HTTPConnection(host_ip, port_num)
print("开始发送请求")
conn.request(url=host_ip, method="POST", headers=headers, body=str(msg))
response = conn.getresponse().read().decode("utf-8")
print(response)
return
if __name__ == "__main__":
host_ip, port_num, send_file_list = read_config()
for item in send_file_list:
mq_addr_list = []
if item.strip():
time.sleep(int(item.split(',')[2]))
for addr in item.split(',')[1].split('/'):
mq_addr_list.append(addr)
send_process(host_ip, port_num, mq_addr_list, item.split(',')[0])