请稍等...页面即将加载完成

python版消息前置服务

时间: 2012-01-11 / 分类: Python&Ruby / 浏览次数: 120 views / 0个评论 发表评论

      虽然在这次的德州demo中我只是把消息服务作为了一个线程处理,但并不影响实际的消息管理,其实也没有什么要说的,消息格式:消息长度+版本号+事件名+事件对应参数详情。之前写短信网关时候对SocketServer没有理解太深,没有设置socket的超时时间,结果导致大量请求挂起,导致最后服务挂起。

      今天一连写了三篇博客,大部分是对最近这段时间的所作描述,因为表述不怎么好,所以就贴了比较多的代码。以后再回过头来看曾经写的代码会是什么样的心情呢。在flash上我也写了一些模块,本本快没电了,现在也有点晚了,所以就放在下一篇文章中在描述吧。

View Code PYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#coding=gbk
import time
import struct
import socket
import logging
import traceback
import threading
import binascii
 
import Object
import Config
import ClientEvent
import SocketServer
import MsgPackage
import Connection
import Game.M as M
import Game.I as I
 
class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
    '''线程请求管理'''
 
    def setup(self):
        self.request.settimeout(Config.CLIENT_OUTTIME)
 
    def send(self, command, info):
        _data = MsgPackage.pack(command, info)
        _len = len(_data)
        _data = struct.pack('>L', _len) + _data
        self.request.send(_data)
        logging.info('send[%s] data[%s]:%s', M.get(command), _len, info)
 
    def handle(self):
        try:
            while True:
                ctime = time.time()
                buf = self.request.recv(4)
                if not buf:
                    break
 
                cmd_len = struct.unpack('>L', buf)[0]
                data = ''
                while len(data) < cmd_len:
                    obuf = self.request.recv(cmd_len - len(data))
                    if not obuf:
                        raise Exception('not receive anything!')
                    data += obuf
 
                _data = MsgPackage.unpack(data)
                logging.info('get[%s] data[%s]:%s', M.get(_data['c']), cmd_len, data)
 
                ret = ClientEvent.deal(_data['c'], _data['i'], self)
 
                if ret: #有返回消息
                    self.send(*ret)
 
        except socket.timeout:#连接超时
            cur_thread = threading.currentThread()
            emsg = cur_thread.getName() + ' timeout connect!'
            logging.info(emsg)
 
        except:
            logging.error('error in ThreadedTCPRequestHandler :%s, res:', traceback.format_exc())
 
        self.request.close()
        Connection.unbind(self.request)
 
class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
    pass
 
class Server:
    def __init__(self, host, port):
        self.host = host
        self.port = port
        self.server = None
        ThreadedTCPServer.allow_reuse_address = True
 
    def run(self):
        server_check = threading.Thread(target=self.check)
        server_check.setDaemon(True)
        server_check.start()
 
        self.server = ThreadedTCPServer((self.host, self.port), ThreadedTCPRequestHandler)
        self.server.serve_forever()
 
    def check(self):
        while not Object.END_EVENT.isSet():
            Object.END_EVENT.wait(2)
 
        self.shutdown()
 
    def stop(self):
        self.shutdown()
 
if __name__ == '__main__':
    Server('0.0.0.0', 25525).run()

      下面这个是上文提到的GameServer代码,主要是轮询赛桌当前应该处理的事件和自处理事件。没有什么技术含量,主要是补充上篇博客的阅读:

View Code PYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#coding=gbk
import time
import traceback
import logging
import threading
 
import Config
import Queue
import Object
 
_INSTANCE_ = None
 
 
class GameServer:
 
    def __init__(self):
        self._table_conf = {}
        self._table_msgq = {}
 
    def join_table(self, table):
        if self._table_conf.has_key(table.table_id):
            return -1
 
        self._table_conf[table.table_id] = table
        self._table_msgq[table.table_id] = Queue.Queue()
        logging.info('table[%s] created!', table.table_id)
 
    def do_event(self, table_id, uid, command, info):
        '''处理一个事件'''
        t_queue = self._table_msgq.get(table_id, None)
        if not t_queue:
            return -1
        t_queue.put({'uid':uid, 'event':command, 'info':info})
        return True
 
    def get_event(self, table_id):
        '''获取一个事件'''
        t_queue = self._table_msgq.get(table_id, None)
        if not t_queue:
            return {}
        try:
            return t_queue.get_nowait()
        except:
            return {}
 
    def start(self):
        t = threading.Thread(target = self.run)
        t.setDaemon(True)
        t.start()
        return self
 
    def run(self):
        while not Object.END_EVENT.isSet():
            try:
                for tid, table in self._table_conf.items():
                    #logging.info('%s, %s, %s', table.status, table.wait_next_uid, table.wait_end_uid)
                    if table.status != Config.GAMEOVER_STATUS:
                        table.check(self.get_event(tid))
 
                time.sleep(1)
            except:
                logging.error('error in GameServer.run:%s', traceback.format_exc())
 
 
def getInstance():
    '''获取游戏服务进程'''
    global _INSTANCE_
    if not _INSTANCE_:
        _INSTANCE_ = GameServer()
    return _INSTANCE_
更多
订阅

发表评论

您的昵称 *

您的邮箱 *

您的网站


- three = 2