庆祝中华人民共和国成立70周年

高举旗帜 团结一致 锐意进取

python-socket:一个简易的tcp服务器

# -*- coding: utf-8 -*-
import socket  # 导入 socket 模块
from threading import Thread
import threading
import json as js
import time
import datetime
import redis
import os
from Pdo import Pdo

os.chdir(os.path.dirname(os.path.abspath(__file__)))

db = Pdo(user="tcps_ckeck_cn", password="123456", database="tcps_ckeck_cn")
rds = redis.Redis(host="127.0.0.1", port=6379, db=0)


class Tcp:
    # 初始化
    def __init__(self, host="0.0.0.0", port=9999):
        self.host = host
        self.port = port
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  # 创建 socket 对象
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind((host, port))
        sock.listen(5)  # 最大等待数(有很多人理解为最大连接数,其实是错误的)
        self.sock = sock
        self.pool = {}  # 连接池
        self.hardware = {}  # 硬件池
        self.thread_pool = {}  # 线程池

        thread = Thread(target=self.accept_client, args=())
        thread.setDaemon(True)
        thread.start()
    
    def log(self,log_text,log_type='',log_data=''):
        log_text = str(log_text)
        log_data = str(log_data)
        log_type = str(log_type)
        now = datetime.datetime.now().__format__('%Y-%m-%d %H:%M:%S')
        log_text = "【"+now+"】 ["+log_type +"] {"+ log_text + "} ("+log_data +") "
        file = r'./run.log'
        with open(file, 'a+') as f:
            f.write(log_text+'\n')

    def get_cid(self, client_id):
        get_cid = list(self.hardware.keys())[list(self.hardware.values()).index(client_id)]
        return get_cid

    def close_client(self, client):
        k = str(id(client))
        # # 删除连接
        if k in list(self.pool.keys()):
            del self.pool[k]
        # # 清除线程  删除 线程池
        if k in list(self.thread_pool.keys()):
            self.thread_pool[k]._Thread__stop()
            del self.thread_pool[k]
        # # 清除设备
        if k in list(self.hardware.values()):
            cc_cid = self.get_cid(k)
            # db.save("hardware", "cid = '%s'" % cc_cid, {"online": 0})
            del self.hardware[cc_cid]
        try:
            client.shutdown(socket.SHUT_RDWR) 
            client.close()
        except:
            pass
        self.log("断开链接:"+ k,"close_client")
        return k

    def accept_client(self):
        # 接收新连接
        while True:
            client, _ = self.sock.accept()  # 阻塞,等待客户端连接
            try:
                # 加入连接池
                client_id = str(id(client))
                print "\033[1;32m 建立新链接:" + client_id + " \033[0m"
                self.log("建立新链接:" + client_id ,"accept_client");
                self.pool[client_id] = client
                # print self.pool
                # 给每个客户端创建一个独立的线程进行管理
                thread = Thread(target=self.message_handle, args=(client,))
                # 设置成守护线程
                thread.setDaemon(True)
                thread.start()
                self.thread_pool[client_id] = thread
            except BaseException as e:
                self.log(e,"accept_client")
                print "accept_client - ERROR!"
                # print "\naccept client error "
    def message_handle(self, client):
        # 消息处理
        m_client_id = str(id(client))
        while True:
            bytes = client.recv(1024)
            if len(bytes) > 0:
                # 处理数据
                # print "\n------接收数据------↓"
                # print bytes
                # print "------接收数据------↑"
                try:
                    data = js.loads(bytes)
                    TYPE = data.get("TYPE", 0) 
                    if TYPE == 0:
                        print "\nunknown 'TYPE'" + data
                    elif TYPE == "HS":
                        try:
                            # 握手注册
                            CID = data["CID"].encode("utf-8")
                            if self.handshake(client, CID):
                                print "\033[1;34m 硬件注册成功 CID:" + CID+" -> " +  m_client_id + "\033[0m"
                                self.log("硬件注册成功 CID:" + CID + " -> " +  m_client_id ,"accept_client");
                                rds.setex(CID, 10, int(time.time()))
                        except BaseException as e:
                            self.log(e,"message_handle-HS",bytes)
                        pass
                    elif TYPE == "STATE":
                        # 硬件状态更新
                        # print "\nget hardware state:"+bytes
                        state_rdsk = self.get_cid(m_client_id)+"_state"
                        rds.set(state_rdsk, bytes)
                        pass
                    elif TYPE == "HB":
                        # 心跳数据
                        # print "\nget HB data:"+bytes
                        rds.setex(self.get_cid(m_client_id), 10, time.time())
                    elif TYPE == "DATA":
                        # 硬件发送的业务数据
                        data["TIME"] = int(time.time())
                        data_rdsk = self.get_cid(m_client_id)+"_data"
                        rds.lpush(data_rdsk, js.dumps(data))
                        rds.ltrim(data_rdsk, 0, 1000)
                        pass
                    else:
                        pass

                # except IOError as e:
                except BaseException as e:
                    self.log(e,"message_handle",bytes)
                    print "message_handle - ERROR!"
                    # print "\ndata type is not 'json':" + bytes
                # print self.hardware
            else:
                self.close_client(client)
                print "\n客户端下线:" + str(id(client))
                break

    # 握手
    def handshake(self, client, hscid):
        try:
            if db.count("hardware", "cid = '%s'" % hscid) > 0:
                try:
                    self.hardware[hscid] = str(id(client))
                    # db.save("hardware", "cid = '%s'" % hscid, {"online": 1})
                    client.sendall('{"TYPE":"HSOK","data":"HSOK"}')
                    return True
                except BaseException as e:
                    self.log(e,"handshake-has",hscid)
                    return False
            else:
                print "\n\033[1;31m 硬件 [" + hscid + "] 不在数据库中,强制下线! \033[0m"
                self.log("硬件 [" + hscid + "] 不在数据库中,强制下线!","handshake")
                self.close_client(client)
                return False
        except BaseException as e:
            self.log(e,"handshake",hscid)


    # 向cid 发送消息
    def send_message(self, cid, msg):
        cid = str(cid)
        print "\n------发送数据------↓"
        print msg
        print "------发送数据------↑"

        try:
            sock_id = self.hardware[cid]
            client = self.pool[sock_id]
            client.sendall(msg.decode("utf-8"))
        except:
            print("\n发送数据失败,未找到设备,CID为: " + cid)
            self.log("发送数据失败,未找到设备,CID为: " + cid, "send_message");

    def check_hardware(self):
        clients_id = self.hardware.values()
        for chk in self.pool.keys():
            if chk not in clients_id:
                print "\n\033[1;31m 断开未认证的连接:"+str(chk) +"\033[0m"
                self.close_client(self.pool[chk])
                self.log("断开未认证的连接:"+str(chk),"check_hardware")
                return "断开未认证的连接:"+str(chk)
            if rds.exists(self.get_cid(chk)) == False:
                print "\n\033[1;31m 断开超时的连接:"+str(chk) +"\033[0m"
                self.close_client(self.pool[chk])
                self.log("断开超时的连接:"+str(chk),"check_hardware")
                return "断开超时的连接:"+str(chk)
        return "客户端数量:"+str(len(self.pool))
    
    def check_threads(self):
        return threading.enumerate()


if __name__ == '__main__':
    tcp = Tcp()
    while True:
        action = raw_input("ACTION:")
        if action == "1":
            cid_id = raw_input("CID:")
            if cid_id == "1":
                cid = "79AE114C"
            if cid_id == "2":
                cid = "95286F24"
            type = raw_input("TYPE:")
            value = raw_input("VALUE:")
            msg = '{"type":"' + type + '","data":"' + value + '"}'
            print "\nsend msg'" + msg + "' to hardware that Cid is " + str(cid)
            tcp.send_message(cid, msg)
        elif action == "2":
            tcp.check_hardware()
        elif action == "3":
            print tcp.pool
        elif action == "4":
            print tcp.hardware
        elif action == "5":
            print tcp.thread_pool

点赞

发表评论

电子邮件地址不会被公开。 必填项已用*标注