使用python作为TCP服务端 物联网应用
说明
- 因公司自主研发的电哒哒智慧能量柜需要通过TCP与服务器进行通讯,机柜定时传输硬件设备信息,用户扫码后通过服务端下发“租/还”命令,控制机柜开关锁等。
- 服务端通过pymysql直连数据库操作
- 仅做为项目记录,供参考勿喷
附上完整代码
import socketserver
import _thread
import time
import logging
from threading import Thread
from base import base
from helper import helper
from logger import Logger
from mysql import DB
class Myserver(socketserver.BaseRequestHandler):
tcp_data = ""
#主程序
def handle(self):
conn = self.request
conn.sendall(bytes("connection success",encoding="ANSI"))
# Logger('runtime.log', level='info').logger.info('设备已连接>>>' + str(conn))
while True:
try:
ret_bytes = conn.recv(1024 * 2)
ret_str = str(ret_bytes, encoding="ANSI")
tcp_data = ret_str
# 断开连接命令
if ret_str == "q":
# Logger('runtime.log', level='info').logger.info('设备已断开>>>' + str(conn))
conn.sendall(bytes("connection dropped", encoding="utf-8"))
break
//通讯建立后创建线程
_thread.start_new_thread(Myserver.task, (self,conn,ret_str))
except OSError as e:
# Logger('error.log', level='info').logger.info(str(e))
break
time.sleep(0.1)
#线程
def task(self,conn,data):
if data:
try:
while True:
//这里可以写自己的业务逻辑
time.sleep(1000)
except IndexError as e:
Logger('error.log', level='info').logger.info(e)
if __name__ == "__main__":
print('*' * 50)
print('温州引力聚合信息科技有限公司')
print('DIDADA Socket Service v1.0')
print('2019年1月16日')
print('*' * 50)
logging.basicConfig(format='【%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s:%(message)s】',level=logging.INFO)
ip = str(base.GetConfig('service','ip'))
port = int(base.GetConfig('service','port'))
base.KillPort(port)
logging.info('服务开启>>>地址>>>'+ ip +'>>>端口号>>>'+ str(port))
server = socketserver.ThreadingTCPServer((ip,port),Myserver)
server.serve_forever()
Logger类
import logging
from logging import handlers
import os
import ctypes
FOREGROUND_WHITE = 0x0007
FOREGROUND_BLUE = 0x01 # text color contains blue.
FOREGROUND_GREEN = 0x02 # text color contains green.
FOREGROUND_RED = 0x04 # text color contains red.
FOREGROUND_YELLOW = FOREGROUND_RED | FOREGROUND_GREEN
STD_OUTPUT_HANDLE= -11
std_out_handle = ctypes.windll.kernel32.GetStdHandle(STD_OUTPUT_HANDLE)
def set_color(color, handle=std_out_handle):
bool = ctypes.windll.kernel32.SetConsoleTextAttribute(handle, color)
return bool
class Logger(object):
level_relations = {
'debug':logging.DEBUG,
'info':logging.INFO,
'warning':logging.WARNING,
'error':logging.ERROR,
'crit':logging.CRITICAL
}#日志级别关系映射
def __init__(self,filename,level='info',when='D',backCount=3,fmt='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'):
set_color(FOREGROUND_GREEN)
self.logger = logging.getLogger(filename)
format_str = logging.Formatter(fmt)#设置日志格式
self.logger.setLevel(self.level_relations.get(level))#设置日志级别
sh = logging.StreamHandler()#往屏幕上输出
sh.setFormatter(format_str) #设置屏幕上显示的格式
th = handlers.TimedRotatingFileHandler(filename=filename,when=when,backupCount=backCount,encoding='utf-8')#往文件里写入#指定间隔时间自动生成文件的处理器
#实例化TimedRotatingFileHandler
#interval是时间间隔,backupCount是备份文件的个数,如果超过这个个数,就会自动删除,when是间隔的时间单位,单位有以下几种:
# S 秒
# M 分
# H 小时、
# D 天、
# W 每星期(interval==0时代表星期一)
# midnight 每天凌晨
th.setFormatter(format_str)#设置文件里写入的格式
self.logger.addHandler(sh) #把对象加到logger里
self.logger.addHandler(th)
DB类
import pymysql.cursors
class DB(object):
# 连接数据库
def __init__(self, host='localhost', port=3306, user='root', passwd='', db='mysql', charset='utf8'):
self.host = host
self.port = port
self.user = user
self.passwd = passwd
self.db = db
self.charset = charset
self.connect = pymysql.Connect(
host=self.host,
port=self.port,
user=self.user,
passwd=self.passwd,
db=self.db,
charset=self.charset
)
# 使用cursor()方法获取操作游标
self.cursor = self.connect.cursor()
def select(self, sql):
# SQL 查询语句
try:
# 执行SQL语句
self.cursor.execute(sql)
# 获取所有记录列表
results = self.cursor.fetchall()
return results
except Exception as e:
return e
# 关闭数据库连接
self.connect.close()
Base类
import logging
import configparser
import os
import re
import requests
import json
class base:
def __init__(self):
pass
# 端口占用清理
def KillPort(port):
# # 查找端口的pid
find_port= 'netstat -aon | findstr %s' % port
result = os.popen(find_port)
text = result.read()
logging.info('清理占用端口>>>' + find_port)
logging.info('清理占用端口>>>' + text)
# logging.info(text [9:23])
if text[9:23] =='127.0.0.1:%s' % port:
pid = text[70:76]
logging.info('占用端口的pid:' + pid)
find_kill= 'taskkill -f -pid %s' %pid
result = os.popen(find_kill)
logging.info(result.read())
pass
#获取配置文件信息
def GetConfig(section,key,path='config.ini'):
config = configparser.ConfigParser()
config.read(path) # 配置文件的路径
v = config.get(section,key)
return v
#Post请求
def http_post(url,parameter={}):
return requests.post(url, params=parameter)
#判断字符串是否包含另外一个字符串
def find_string(s, t):
return t in s