# -*- coding: utf-8 -*-
import socket # 导入 socket 模块
from threading import Thread
import threading
import json as js
import time
import datetime
import redis
import os
from Pdo import Pdo
os.chdir(os.path.dirname(os.path.abspath(__file__)))
db = Pdo(user="tcps_ckeck_cn", password="123456", database="tcps_ckeck_cn")
rds = redis.Redis(host="127.0.0.1", port=6379, db=0)
class Tcp:
# 初始化
def __init__(self, host="0.0.0.0", port=9999):
self.host = host
self.port = port
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建 socket 对象
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
sock.listen(5) # 最大等待数(有很多人理解为最大连接数,其实是错误的)
self.sock = sock
self.pool = {} # 连接池
self.hardware = {} # 硬件池
self.thread_pool = {} # 线程池
thread = Thread(target=self.accept_client, args=())
thread.setDaemon(True)
thread.start()
def log(self,log_text,log_type='',log_data=''):
log_text = str(log_text)
log_data = str(log_data)
log_type = str(log_type)
now = datetime.datetime.now().__format__('%Y-%m-%d %H:%M:%S')
log_text = "【"+now+"】 ["+log_type +"] {"+ log_text + "} ("+log_data +") "
file = r'./run.log'
with open(file, 'a+') as f:
f.write(log_text+'\n')
def get_cid(self, client_id):
get_cid = list(self.hardware.keys())[list(self.hardware.values()).index(client_id)]
return get_cid
def close_client(self, client):
k = str(id(client))
# # 删除连接
if k in list(self.pool.keys()):
del self.pool[k]
# # 清除线程 删除 线程池
if k in list(self.thread_pool.keys()):
self.thread_pool[k]._Thread__stop()
del self.thread_pool[k]
# # 清除设备
if k in list(self.hardware.values()):
cc_cid = self.get_cid(k)
# db.save("hardware", "cid = '%s'" % cc_cid, {"online": 0})
del self.hardware[cc_cid]
try:
client.shutdown(socket.SHUT_RDWR)
client.close()
except:
pass
self.log("断开链接:"+ k,"close_client")
return k
def accept_client(self):
# 接收新连接
while True:
client, _ = self.sock.accept() # 阻塞,等待客户端连接
try:
# 加入连接池
client_id = str(id(client))
print "\033[1;32m 建立新链接:" + client_id + " \033[0m"
self.log("建立新链接:" + client_id ,"accept_client");
self.pool[client_id] = client
# print self.pool
# 给每个客户端创建一个独立的线程进行管理
thread = Thread(target=self.message_handle, args=(client,))
# 设置成守护线程
thread.setDaemon(True)
thread.start()
self.thread_pool[client_id] = thread
except BaseException as e:
self.log(e,"accept_client")
print "accept_client - ERROR!"
# print "\naccept client error "
def message_handle(self, client):
# 消息处理
m_client_id = str(id(client))
while True:
bytes = client.recv(1024)
if len(bytes) > 0:
# 处理数据
# print "\n------接收数据------↓"
# print bytes
# print "------接收数据------↑"
try:
data = js.loads(bytes)
TYPE = data.get("TYPE", 0)
if TYPE == 0:
print "\nunknown 'TYPE'" + data
elif TYPE == "HS":
try:
# 握手注册
CID = data["CID"].encode("utf-8")
if self.handshake(client, CID):
print "\033[1;34m 硬件注册成功 CID:" + CID+" -> " + m_client_id + "\033[0m"
self.log("硬件注册成功 CID:" + CID + " -> " + m_client_id ,"accept_client");
rds.setex(CID, 10, int(time.time()))
except BaseException as e:
self.log(e,"message_handle-HS",bytes)
pass
elif TYPE == "STATE":
# 硬件状态更新
# print "\nget hardware state:"+bytes
state_rdsk = self.get_cid(m_client_id)+"_state"
rds.set(state_rdsk, bytes)
pass
elif TYPE == "HB":
# 心跳数据
# print "\nget HB data:"+bytes
rds.setex(self.get_cid(m_client_id), 10, time.time())
elif TYPE == "DATA":
# 硬件发送的业务数据
data["TIME"] = int(time.time())
data_rdsk = self.get_cid(m_client_id)+"_data"
rds.lpush(data_rdsk, js.dumps(data))
rds.ltrim(data_rdsk, 0, 1000)
pass
else:
pass
# except IOError as e:
except BaseException as e:
self.log(e,"message_handle",bytes)
print "message_handle - ERROR!"
# print "\ndata type is not 'json':" + bytes
# print self.hardware
else:
self.close_client(client)
print "\n客户端下线:" + str(id(client))
break
# 握手
def handshake(self, client, hscid):
try:
if db.count("hardware", "cid = '%s'" % hscid) > 0:
try:
self.hardware[hscid] = str(id(client))
# db.save("hardware", "cid = '%s'" % hscid, {"online": 1})
client.sendall('{"TYPE":"HSOK","data":"HSOK"}')
return True
except BaseException as e:
self.log(e,"handshake-has",hscid)
return False
else:
print "\n\033[1;31m 硬件 [" + hscid + "] 不在数据库中,强制下线! \033[0m"
self.log("硬件 [" + hscid + "] 不在数据库中,强制下线!","handshake")
self.close_client(client)
return False
except BaseException as e:
self.log(e,"handshake",hscid)
# 向cid 发送消息
def send_message(self, cid, msg):
cid = str(cid)
print "\n------发送数据------↓"
print msg
print "------发送数据------↑"
try:
sock_id = self.hardware[cid]
client = self.pool[sock_id]
client.sendall(msg.decode("utf-8"))
except:
print("\n发送数据失败,未找到设备,CID为: " + cid)
self.log("发送数据失败,未找到设备,CID为: " + cid, "send_message");
def check_hardware(self):
clients_id = self.hardware.values()
for chk in self.pool.keys():
if chk not in clients_id:
print "\n\033[1;31m 断开未认证的连接:"+str(chk) +"\033[0m"
self.close_client(self.pool[chk])
self.log("断开未认证的连接:"+str(chk),"check_hardware")
return "断开未认证的连接:"+str(chk)
if rds.exists(self.get_cid(chk)) == False:
print "\n\033[1;31m 断开超时的连接:"+str(chk) +"\033[0m"
self.close_client(self.pool[chk])
self.log("断开超时的连接:"+str(chk),"check_hardware")
return "断开超时的连接:"+str(chk)
return "客户端数量:"+str(len(self.pool))
def check_threads(self):
return threading.enumerate()
if __name__ == '__main__':
tcp = Tcp()
while True:
action = raw_input("ACTION:")
if action == "1":
cid_id = raw_input("CID:")
if cid_id == "1":
cid = "79AE114C"
if cid_id == "2":
cid = "95286F24"
type = raw_input("TYPE:")
value = raw_input("VALUE:")
msg = '{"type":"' + type + '","data":"' + value + '"}'
print "\nsend msg'" + msg + "' to hardware that Cid is " + str(cid)
tcp.send_message(cid, msg)
elif action == "2":
tcp.check_hardware()
elif action == "3":
print tcp.pool
elif action == "4":
print tcp.hardware
elif action == "5":
print tcp.thread_pool