0%

前段时间在项目中碰到一个redis list插不进去的问题。背景是使用了redis list做消息队列,有两个服务器,server1存贮的redis string 类型的key的value是服务器1自己的mq1,server2要将消息插入server1的mq1。然后server1在读出这个mq1中存贮的消息。再去做一些业务操作。server1对mq1采用的是brpop,也就是阻塞的拿消息。

下面将mq这个redis list的key成为mq1。

我碰到了一个很奇葩的问题,server2从redis中读出了mq1,然后往mq1插入也没有报错。但是server1对mq1进行brpop一直拿不到消息。很是纳闷。甚至一度以为redis除了问题。下面进入艰难的调试过程。

先查看redis有多少连接

因为无论哪个server,对redis来说都是client进行操作,就想查看redis的连接。是会不会有别的server和我pop了同一个list,可以用client list命令查看。下面的输出是举例,调试时的输出已经过去太久找不到了。例如:

1
2
3
4
5
6
7
8
9
127.0.0.1:6006> client list
id=381 addr=192.xxx.xx.x:27804 fd=15 name= age=1367 idle=36 flags=b db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=brpop
id=385 addr=192.xxx.xx.x:52192 fd=10 name= age=8 idle=0 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=26 qbuf-free=32742 obl=0 oll=0 omem=0 events=r cmd=client
id=378 addr=192.xxx.xx.x:27316 fd=12 name= age=1367 idle=254 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=get
id=382 addr=192.xxx.xx.x:27820 fd=16 name= age=1367 idle=33 flags=b db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=brpop
id=379 addr=192.xxx.xx.x:27328 fd=13 name= age=1367 idle=1367 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=info
id=380 addr=192.xxx.xx.x:27334 fd=14 name= age=1367 idle=1212 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=exec
id=10 addr=192.168.96.5:51404 fd=9 name= age=950692 idle=950692 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=ping
id=9 addr=192.168.96.5:51402 fd=8 name= age=950692 idle=950692 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=info

在我查看之后,发现的确时一个lpush,一个pop都没有。(这些输出参数都有意义,感兴趣可以自己查一下)

查redis日志

每个服务器对于redis来说都是client,如果能查日志就好了,经过查询资料,可以用以下方式查日志。这个方式一般来说是用来查询慢查询的日志,但是可以设置,把慢查询的日志设置为0,既可以获取所有日志。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 查询log的时间阀值(微秒,一毫秒等于1000微秒),大于该数字的语句才会记录。负数表示不记录,0记录所有的。
`config get slowlog-log-slower-than`

# 设置log的时间阀值为0毫秒
`config set slowlog-log-slower-than 0`

# 查询log的最大条数。大于该数字,旧的会被丢弃。
`config get slowlog-max-len`

# 设置log的最大条数为300
`config set slowlog-max-len 300`

# 获取最近20条日志
`slowlog get 20`

在获取了日志之后,也没发现什么问题,server2的确插入了(这里的输出也是demo,调试输出找不到了)。而且并没有pop,这就奇怪了,难道有什么阻塞了么。

1
2
3
4
5
6
7
8
9
127.0.0.1:6006> slowlog get 20
1) 1) (integer) 2
2) (integer) 1626785061
3) (integer) 16357
4) 1) "lpush"
2) "game"
3) "{\"msg\":{\"data\":{\"playerName\":\"bfs\",\"msg\":\"\xe6\xb6\x88\xe6\x81\xaf\xe4\xbd\x938\",\"playerId\":\"832a3d73-317c-4a39-8df8-c2fd8965a1b3\",\"timestamp\":162380889111... (185 more bytes)"
5) "192.xxx.xx.x:27334"
6) ""

进入redis监视着模式

最后尝试了进入reids monitor,这个模式下可以动态观看日志。输入类似这样,所有的无所遁形。

1
2
3
4
127.0.0.1:6006> monitor
OK
1626787232.810556 [0 192.xxx.xx.x:27316] "get" "AuthToken-747b1041-7390-4d85-bcc2-a9f2221ce52d"
1626787232.831783 [0 192.xxx.xx.x:27316] "setex" "AuthToken-747b1041-7390-4d85-bcc2-a9f2221ce52d" "1800" "{\"tokenId\":\"747b1041-7390-4d85-bcc2-a9f2221ce52d\",\"generated\":1626783720,\"userId\":\"832a3d73-317c-4a39-8df8-c2fd8965a1b3\",\"expire\":1784463720}"

进入这个monitor之后,发现的确也是发现server2的确插入了,但是server1就是没能brpop出消息,就在人发呆之际,发现server1 brpop的key和server2插入的key不一样。server1 brpop的key是直接写死的mq1,然是server1在存贮mq1的时候进行了json化,导致server2拿到的mq1是\“mq1\”,所以一直插错了。。。

最后是自己坑了自己,但是是一个有节奏的调试过程,做记录。

今天项目中抓取日志的时候,因为日志打印出来的数据存是josn字符串,我要匹配的是这串

1
\"userId\":\"867340f5-b992-4fe5-92cc-5390de659491\",

按照经验,”和\是需要转换的,特殊字符串使用\做转换,”用\”转义,\用\转换,在使用ag做匹配时这样写

1
cat log1.txt | ag "{\\\"userId\\\\\":\\\\\"867340f5-b992-4fe5-92cc-5390de659491\\\","

但是却没有结果
在这里插入图片描述
经过尝试,发现原来\是需要使用\来转义才可以匹配,多一个少一个都不行。

1
cat log1.txt | ag "{\\\\\"userId\\\\\":\\\\\"867340f5-b992-4fe5-92cc-5390de659491\\\\\","

在这里插入图片描述

最近项目中高并发时碰到golang碰到了map并发读写的问题。
找问题找了好久,可以借助这个方发,编译的时候加上-race,在发生并发读写的地方会有提醒。

1
go run main.go -race

会出现DATA RACE,这张图是网上找的,我也尝试过,图找不到了。借用网友的图。
在这里插入图片描述
这是一个方法,使用这个也会有所缺陷,例如你的协程就不能开太多了,我记得好像只能开8000多个。而我开启了之后因为协程不够,并发不够又无法触发,最后是领导找到的。

最后我的原因是因为一般一个变量会定义在循环之外(这不是理所应当的么,当时写的时候没想那么多,而又是自己的思路,找了好久没找到),我在循环之外定义了变量,然后这个变量赋值的时候里面有map,导致了这个问题,最后把变量定义放在循环里就可以了。

最近在集群中查询redis碰到这个问题,

1
(error) MOVED 11863 xxx.xxx.xxx.xxx:6379

简单查询后发现,是因为redis是集群方式的,而使用redis-cli连接redis的时候,没有指定集群模式,指定后即可。

这样指定即可

1
./redis-cli -h xxx.xxx.xxx -p 6379 -c

最近在工作中碰到了这个问题,我需要在一个set中记录集群中k8s pod的ip,但是pod随时会挂,所以采取一个定时让每个pod去update cache的做法。但是set中是无法设置其中元素过期时间的。有两个解决方法

把set改用sorted set

每个元素都带上分数,这个分数就是你的过期时间。先用ZRemRangeByScore删除过期元素(分数之外的元素),然后每个pod带上过期时间戳去定时去EasyZAdd,下面是伪代码

1
2
3
4
5
6
7
8
for{
nowTime := time.Now().Unix()
redis.ZRemRangeByScore("setName", "0", strconv.FormatInt((nowTime-5), 10))
urlInfo := redis.Z{
Score: float64(nowTime), //以秒为单位
Member: *h.SqsUrl}
redis.ZAdd("setName", &urlInfo)
}

不用set,用string

这个就是使用固定的开头,例如:my*server_ip_xxx.xxx.xxx。然后使用传统的setex,获取key的时候使用模糊匹配,keys my_server_ip**也可。

用哪种仁者见仁智者见智。

最近在使用golang开发中,并发量达到一定程度时就会发生panic,想看堆栈,但是因为是跑在k8中的程序,输出都是直接输出到标准输出的。也没有日志。

在本地调试的时候直接运行了go程序,输出也是一大堆,一下子就被刷掉了,所以要把输出输入到一个文件里。

一般这样

1
2
3
command > filename // 把标准输出重定向到一个新文件中
// 例如
./programmer > log.txt

但是这样,如果发生堆栈错误还是会直接输出到屏幕。无法保存。

使用以下方法

1
2
3
command > filename 2>&1 // 把标准输出和标准错误一起重定向到一个文件中
// 例如
./programmer > log.txt 2>&1

这样即可保存标准输出和错误输出。如果把“>”换成”>>”就是追加输出到文件

做此纪录

本来使用vs做golang开发的时候,代码都会对齐的,忽然有一天,就不能对齐了。
在网上找了好多都没找到答案,最后今天看了Effective Go前面的Formatting,觉得是这个东西出问题了。
然后查看vs配置,File—->Preferences—->settting—->go。
在这里插入图片描述
点击后查看发现没有这一项,加上就好了。非常的简单。做此纪录
在这里插入图片描述

接上回,继续分析skynet网络线程。主要分析的是tcp,
网络很长很长,需要分两篇分析:
1分析别的线程向socket线程发送消息。
2分析客户端向socket线程发送消息。
网络线程执行代码如下。

socket_server的创建

skynet_state.c中调用的时候调用skynet_socket_init初始化,这里有一个需要需要的地方,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
void 
skynet_socket_init() {
SOCKET_SERVER = socket_server_create(skynet_now());
}

struct socket_server *
socket_server_create(uint64_t time) {
int i;
int fd[2]; // 声明两个管道,读和写
poll_fd efd = sp_create(); // 创建epoll
// 检查epoll是否创建成功
if (sp_invalid(efd)) {
fprintf(stderr, "socket-server: create event pool failed.\n");
return NULL;
}
// 创建读、写管道
if (pipe(fd)) {
sp_release(efd);
fprintf(stderr, "socket-server: create socket pair failed.\n");
return NULL;
}
// 将读管道加入epoll,并监听可读
if (sp_add(efd, fd[0], NULL)) {
// add recvctrl_fd to event poll
fprintf(stderr, "socket-server: can't add server fd to event pool.\n");
close(fd[0]);
close(fd[1]);
sp_release(efd);
return NULL;
}

// 创建socket_server
struct socket_server *ss = MALLOC(sizeof(*ss));
ss->time = time; // 关联创建时间
ss->event_fd = efd; // 关联epoll
ss->recvctrl_fd = fd[0]; // 关联读管道
ss->sendctrl_fd = fd[1]; // 关联写管道
ss->checkctrl = 1; // 将其他线程通知初始化为1,即啊有通知

// 一个socket_server创建32个socket结构
// 每个socket都是用于向外部发送消息的
// 也就是说,别的线程发过来的消息会通过这32个socket结构发出去
for (i=0;i<MAX_SOCKET;i++) {
struct socket *s = &ss->slot[i];
s->type = SOCKET_TYPE_INVALID; // type 初始化为SOCKET_TYPE_INVALID
clear_wb_list(&s->high); // 清空高优先级列表
clear_wb_list(&s->low); // 清空低优先级列表
spinlock_init(&s->dw_lock); // 自旋锁初始化
}
ss->alloc_id = 0;
ss->event_n = 0;
ss->event_index = 0;
memset(&ss->soi, 0, sizeof(ss->soi));
FD_ZERO(&ss->rfds);
assert(ss->recvctrl_fd < FD_SETSIZE);

return ss;
}

socket线程工作流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#define CHECK_ABORT if (skynet_context_total()==0) break;

static void *
thread_socket(void *p) {
struct monitor * m = p; // 拿到服务器 monitor
skynet_initthread(THREAD_SOCKET); // 设置线程共享变量,标记为网络线程
for (;;) {
int r = skynet_socket_poll(); // socket处理
if (r==0)
break;
if (r<0) {
CHECK_ABORT // 如果socket返回值小于0,判断服务器时候还存在skynet服务
continue;
}
wakeup(m,0); // 如果没有工作线程,则通过条件变量唤醒线程
}
return NULL;
}

可以看到,主要的socket处理就是skynet_socket_poll,看到skynet_socket_poll。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
int 
skynet_socket_poll() {
struct socket_server *ss = SOCKET_SERVER; // 获得socket_server
assert(ss);
struct socket_message result; // 声明socket消息
int more = 1;
int type = socket_server_poll(ss, &result, &more); // 获取消息类型
// 按照不同的类型处理消息
switch (type) {
case SOCKET_EXIT:
return 0;
case SOCKET_DATA:
forward_message(SKYNET_SOCKET_TYPE_DATA, false, &result);
break;
case SOCKET_CLOSE:
forward_message(SKYNET_SOCKET_TYPE_CLOSE, false, &result);
break;
case SOCKET_OPEN:
forward_message(SKYNET_SOCKET_TYPE_CONNECT, true, &result);
break;
case SOCKET_ERR:
forward_message(SKYNET_SOCKET_TYPE_ERROR, true, &result);
break;
case SOCKET_ACCEPT:
forward_message(SKYNET_SOCKET_TYPE_ACCEPT, true, &result);
break;
case SOCKET_UDP:
forward_message(SKYNET_SOCKET_TYPE_UDP, false, &result);
break;
case SOCKET_WARNING:
forward_message(SKYNET_SOCKET_TYPE_WARNING, false, &result);
break;
default:
skynet_error(NULL, "Unknown socket message type %d.",type);
return -1;
}
if (more) {
return -1;
}
return 1;
}

可以看到,就是获取socket消息类型并处理的过程。按照以下顺序分析:
1.socket_server的结构、socket的结构,socket消息结构。
2.socket_server_poll获取消息类型。
3.别的服务发来socket各个消息怎么处理的。

socket相关结构

先简单看一下相关结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// socket_server.c
// socket write 链表节点结构
struct write_buffer {
struct write_buffer * next;
void *buffer;
char *ptr;
int sz;
bool userobject;
uint8_t udp_address[UDP_ADDRESS_SIZE];
};

#define SIZEOF_TCPBUFFER (offsetof(struct write_buffer, udp_address[0]))
#define SIZEOF_UDPBUFFER (sizeof(struct write_buffer))

// socket write 链表
struct wb_list {
struct write_buffer * head;
struct write_buffer * tail;
};

struct socket_stat {
uint64_t rtime;
uint64_t wtime;
uint64_t read;
uint64_t write;
};

struct socket {
uintptr_t opaque;
struct wb_list high; // 高优先级发送队列,wb是write buff
struct wb_list low; // 低优先级发送队列
int64_t wb_size; // 发送字节大小
struct socket_stat stat; // socket 的发送记录
volatile uint32_t sending; //
int fd; // socket文件描述符
int id; // 位于socket_server的slot列表中的位置
uint8_t protocol; // tcp or udp
uint8_t type; // epoll事件触发时,会根据type来选择处理事件的逻辑
uint16_t udpconnecting;
int64_t warn_size;
union {
int size;
uint8_t udp_address[UDP_ADDRESS_SIZE];
} p;
struct spinlock dw_lock; // 自旋锁
int dw_offset;
const void * dw_buffer;
size_t dw_size;
};

struct socket_server {
volatile uint64_t time;
int recvctrl_fd; // 接收管道消息的文件描述
int sendctrl_fd; // 发送管道消息的文件描述
int checkctrl; // 判断是否有其他线程通过管道,向socket线程发送消息的标记变量
poll_fd event_fd; // epoll实例id
int alloc_id; // 已经分配的socket slot列表id
int event_n; // 标记本次epoll事件的数量
int event_index; // 下一个未处理的epoll事件索引
struct socket_object_interface soi;
struct event ev[MAX_EVENT]; // epoll事件列表
struct socket slot[MAX_SOCKET]; // socket 列表
char buffer[MAX_INFO]; // 地址信息转成字符串以后,存在这里
uint8_t udpbuffer[MAX_UDP_PACKAGE];
fd_set rfds; // 文件描述符集合
};

// socket_server.h
struct socket_message {
int id; // skynet socket_server中32个socket结构中的哪个socket发出的消息
uintptr_t opaque; // 哪个skynet服务
int ud; // for accept, ud is new connection id ; for data, ud is size of data 如果是accept,ud是连接文件描述符,如果是data,ud是数据大小
char * data; // 数据
};

socket_server_poll获取消息类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
// return type
int
socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) {
// 进来就是个死循环
for (;;) {
// 是否有其他线程通过管道,向socket线程发送消息
// 这里是本篇分析
if (ss->checkctrl) {
// 判断是否有可读事件,函数之后有分析
if (has_cmd(ss)) {
int type = ctrl_cmd(ss, result); // 判断消息类型并返回,函数之后有分析
if (type != -1) {
// 如果获得消息类型,清零标记并返回
clear_closed_event(ss, result, type);
return type;
} else
// 无法获取类型则继续
continue;
} else {
ss->checkctrl = 0; // 没有可读事件就重置标志
}
}
// 下一个处理的epool事件索引 与 本次epoll事件的数量 是否相等
// 这里下一篇分析
if (ss->event_index == ss->event_n) {
ss->event_n = sp_wait(ss->event_fd, ss->ev, MAX_EVENT);
ss->checkctrl = 1;
if (more) {
*more = 0;
}
ss->event_index = 0;
if (ss->event_n <= 0) {
ss->event_n = 0;
if (errno == EINTR) {
continue;
}
return -1;
}
}
struct event *e = &ss->ev[ss->event_index++];
struct socket *s = e->s;
if (s == NULL) {
// dispatch pipe message at beginning
continue;
}
struct socket_lock l;
socket_lock_init(s, &l);
switch (s->type) {
case SOCKET_TYPE_CONNECTING:
return report_connect(ss, s, &l, result);
case SOCKET_TYPE_LISTEN: {
int ok = report_accept(ss, s, result);
if (ok > 0) {
return SOCKET_ACCEPT;
} if (ok < 0 ) {
return SOCKET_ERR;
}
// when ok == 0, retry
break;
}
case SOCKET_TYPE_INVALID:
fprintf(stderr, "socket-server: invalid socket\n");
break;
default:
if (e->read) {
int type;
if (s->protocol == PROTOCOL_TCP) {
type = forward_message_tcp(ss, s, &l, result);
} else {
type = forward_message_udp(ss, s, &l, result);
if (type == SOCKET_UDP) {
// try read again
--ss->event_index;
return SOCKET_UDP;
}
}
if (e->write && type != SOCKET_CLOSE && type != SOCKET_ERR) {
// Try to dispatch write message next step if write flag set.
e->read = false;
--ss->event_index;
}
if (type == -1)
break;
return type;
}
if (e->write) {
int type = send_buffer(ss, s, &l, result);
if (type == -1)
break;
return type;
}
if (e->error) {
// close when error
int error;
socklen_t len = sizeof(error);
int code = getsockopt(s->fd, SOL_SOCKET, SO_ERROR, &error, &len);
const char * err = NULL;
if (code < 0) {
err = strerror(errno);
} else if (error != 0) {
err = strerror(error);
} else {
err = "Unknown error";
}
force_close(ss, s, &l, result);
result->data = (char *)err;
return SOCKET_ERR;
}
if(e->eof) {
force_close(ss, s, &l, result);
return SOCKET_CLOSE;
}
break;
}
}
}

// 不阻塞判断是否有可读事件
static int
has_cmd(struct socket_server *ss) {
struct timeval tv = {0,0}; // 声明超时时间
int retval; // 返回值

FD_SET(ss->recvctrl_fd, &ss->rfds); //将读管道描述符加入文件描述符集合

// 用select判断是否有可读事件
retval = select(ss->recvctrl_fd+1, &ss->rfds, NULL, NULL, &tv);
if (retval == 1) {
return 1;
}
return 0;
}

// return type
// 获取事件类型
static int
ctrl_cmd(struct socket_server *ss, struct socket_message *result) {
int fd = ss->recvctrl_fd; //拿到读管道描述符
// the length of message is one byte, so 256+8 buffer size is enough.
uint8_t buffer[256];
uint8_t header[2];
block_readpipe(fd, header, sizeof(header)); // 阻塞的读取消息头信息
int type = header[0]; // 从头信息获取消息类型
int len = header[1]; // 从头信息获取消息长度
block_readpipe(fd, buffer, len); // 阻塞读取消息内容
// ctrl command only exist in local fd, so don't worry about endian.
// 按照类型处理消息,下面会逐一分析各个函数
switch (type) {
case 'S':
return start_socket(ss,(struct request_start *)buffer, result);
case 'B':
return bind_socket(ss,(struct request_bind *)buffer, result);
case 'L':
return listen_socket(ss,(struct request_listen *)buffer, result);
case 'K':
return close_socket(ss,(struct request_close *)buffer, result);
case 'O':
return open_socket(ss, (struct request_open *)buffer, result);
case 'X':
result->opaque = 0;
result->id = 0;
result->ud = 0;
result->data = NULL;
return SOCKET_EXIT;
case 'D':
case 'P': {
int priority = (type == 'D') ? PRIORITY_HIGH : PRIORITY_LOW;
struct request_send * request = (struct request_send *) buffer;
int ret = send_socket(ss, request, result, priority, NULL);
dec_sending_ref(ss, request->id);
return ret;
}
case 'A': {
struct request_send_udp * rsu = (struct request_send_udp *)buffer;
return send_socket(ss, &rsu->send, result, PRIORITY_HIGH, rsu->address);
}
case 'C':
return set_udp_address(ss, (struct request_setudp *)buffer, result);
case 'T':
setopt_socket(ss, (struct request_setopt *)buffer);
return -1;
case 'U':
add_udp_socket(ss, (struct request_udp *)buffer);
return -1;
default:
fprintf(stderr, "socket-server: Unknown ctrl %c.\n",type);
return -1;
};

return -1;
}

别的服务发过来 socket 各个消息类型的处理

这里按照open—->listen—->bind—->start—->再往后分析

open_socket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
// 请求opensocket的消息类型
struct request_open {
int id;
int port;
uintptr_t opaque;
char host[1];
};

// return -1 when connecting
// 一个新的连接
static int
open_socket(struct socket_server *ss, struct request_open * request, struct socket_message *result) {
int id = request->id; // 获取id
result->opaque = request->opaque; // 绑定这个socket对应的skynet服务
result->id = id; // 绑定定位于socket_server的slot列表中的位置
result->ud = 0;
result->data = NULL;
struct socket *ns; // 声明一个socket结构
int status; // 声明一个变量待会用来接收返回值
struct addrinfo ai_hints;
struct addrinfo *ai_list = NULL;
struct addrinfo *ai_ptr = NULL;
char port[16];
sprintf(port, "%d", request->port);
memset(&ai_hints, 0, sizeof( ai_hints ) );
ai_hints.ai_family = AF_UNSPEC;
ai_hints.ai_socktype = SOCK_STREAM;
ai_hints.ai_protocol = IPPROTO_TCP;

// 处理名字到地址以及服务到端口这两种转换,返回的是一个addrinfo的结构(列表)指针
status = getaddrinfo( request->host, port, &ai_hints, &ai_list );
if ( status != 0 ) {
result->data = (void *)gai_strerror(status); //如果失败收集错误信息
goto _failed;
}
int sock= -1;
// 遍历addrinfo链表
for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next ) {
// 根据信息,创建socket文件描述符
sock = socket( ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol );
if ( sock < 0 ) {
continue;
}
socket_keepalive(sock); // 设置为keepalive会自动发送心跳包检测
sp_nonblocking(sock); // 设置非阻塞

// 建立socket连接
status = connect( sock, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
if ( status != 0 && errno != EINPROGRESS) {
close(sock);
sock = -1;
continue;
}
break;
}

if (sock < 0) {
result->data = strerror(errno);
goto _failed;
}

// 创建一个socket结构,关联到epoll
ns = new_fd(ss, id, sock, PROTOCOL_TCP, request->opaque, true);
if (ns == NULL) {
close(sock);
result->data = "reach skynet socket number limit";
goto _failed;
}

if(status == 0) {

ns->type = SOCKET_TYPE_CONNECTED;
struct sockaddr * addr = ai_ptr->ai_addr;
void * sin_addr = (ai_ptr->ai_family == AF_INET) ? (void*)&((struct sockaddr_in *)addr)->sin_addr : (void*)&((struct sockaddr_in6 *)addr)->sin6_addr;
if (inet_ntop(ai_ptr->ai_family, sin_addr, ss->buffer, sizeof(ss->buffer))) {
result->data = ss->buffer;
}
freeaddrinfo( ai_list );
return SOCKET_OPEN;
} else {
ns->type = SOCKET_TYPE_CONNECTING;
sp_write(ss->event_fd, ns->fd, ns, true);
}

freeaddrinfo( ai_list );
return -1;
_failed:
freeaddrinfo( ai_list );
ss->slot[HASH_ID(id)].type = SOCKET_TYPE_INVALID;
return SOCKET_ERR;
}

// 为一个新的连接创建一个epool
static struct socket *
new_fd(struct socket_server *ss, int id, int fd, int protocol, uintptr_t opaque, bool add) {
struct socket * s = &ss->slot[HASH_ID(id)];
assert(s->type == SOCKET_TYPE_RESERVE);

// 如果是add,则添加对fd监听读事件
if (add) {
if (sp_add(ss->event_fd, fd, s)) {
// 监听失败处理
s->type = SOCKET_TYPE_INVALID;
return NULL;
}
}

s->id = id; // 绑定solt中的位置
s->fd = fd; // 绑定fd
s->sending = ID_TAG16(id) << 16 | 0;
s->protocol = protocol; // 协议
s->p.size = MIN_READ_BUFFER;
s->opaque = opaque; // 绑定skynet服务
s->wb_size = 0;
s->warn_size = 0;
check_wb_list(&s->high); // 清空高优先级队列
check_wb_list(&s->low); // 清空低优先级队列
s->dw_buffer = NULL;
s->dw_size = 0;
memset(&s->stat, 0, sizeof(s->stat));
return s;
}

listen_socket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
struct request_listen {
int id;
int fd;
uintptr_t opaque;
char host[1];
};

static int
listen_socket(struct socket_server *ss, struct request_listen * request, struct socket_message *result) {
int id = request->id; // 获取id
int listen_fd = request->fd; // 获取fd
// 这个上面已分析
struct socket *s = new_fd(ss, id, listen_fd, PROTOCOL_TCP, request->opaque, false);
if (s == NULL) {
goto _failed;
}
s->type = SOCKET_TYPE_PLISTEN; // 记录类型
return -1;
_failed:
close(listen_fd);
result->opaque = request->opaque;
result->id = id;
result->ud = 0;
result->data = "reach skynet socket number limit";
ss->slot[HASH_ID(id)].type = SOCKET_TYPE_INVALID;

return SOCKET_ERR;
}

bind_socket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
struct request_bind {
int id;
int fd;
uintptr_t opaque;
};

static int
bind_socket(struct socket_server *ss, struct request_bind *request, struct socket_message *result) {
int id = request->id; // 获取请求的id
result->id = id; // 绑定id
result->opaque = request->opaque; // 绑定服务
result->ud = 0;

// 这个函数上面分析过了
struct socket *s = new_fd(ss, id, request->fd, PROTOCOL_TCP, request->opaque, true);
if (s == NULL) {
result->data = "reach skynet socket number limit";
return SOCKET_ERR;
}
sp_nonblocking(request->fd); // 设置非阻塞
s->type = SOCKET_TYPE_BIND; // 设置类型
result->data = "binding";
return SOCKET_OPEN;
}

start_socket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// socket 开始工作
static int
start_socket(struct socket_server *ss, struct request_start *request, struct socket_message *result) {
int id = request->id; // 从接受到的消息获取id
result->id = id; // 绑定位于socket_server的slot列表中的位置
result->opaque = request->opaque; // 绑定与本socket关联的服务地址
result->ud = 0; // ud设置为0
result->data = NULL; // data为 null
struct socket *s = &ss->slot[HASH_ID(id)]; // 将id进行hash并存贮

// 判断是否为无效的socket
if (s->type == SOCKET_TYPE_INVALID || s->id !=id) {
result->data = "invalid socket";
return SOCKET_ERR;
}
struct socket_lock l;
socket_lock_init(s, &l);
if (s->type == SOCKET_TYPE_PACCEPT || s->type == SOCKET_TYPE_PLISTEN) {
if (sp_add(ss->event_fd, s->fd, s)) {
force_close(ss, s, &l, result);
result->data = strerror(errno);
return SOCKET_ERR;
}
s->type = (s->type == SOCKET_TYPE_PACCEPT) ? SOCKET_TYPE_CONNECTED : SOCKET_TYPE_LISTEN;
s->opaque = request->opaque;
result->data = "start";
return SOCKET_OPEN;
} else if (s->type == SOCKET_TYPE_CONNECTED) {
// todo: maybe we should send a message SOCKET_TRANSFER to s->opaque
s->opaque = request->opaque;
result->data = "transfer";
return SOCKET_OPEN;
}
// if s->type == SOCKET_TYPE_HALFCLOSE , SOCKET_CLOSE message will send later
return -1;
}

close_socket

这里我们还需要额外看一看两个方法sp_write,force_close

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
struct request_close {
int id;
int shutdown;
uintptr_t opaque;
};

// 判断发送队列是否为空
static inline int
send_buffer_empty(struct socket *s) {
return (s->high.head == NULL && s->low.head == NULL);
}

// 判断是否还有消息
static inline int
nomore_sending_data(struct socket *s) {
return send_buffer_empty(s) && s->dw_buffer == NULL && (s->sending & 0xffff) == 0;
}

static int
close_socket(struct socket_server *ss, struct request_close *request, struct socket_message *result) {
int id = request->id; // 获取id
struct socket * s = &ss->slot[HASH_ID(id)]; // 获取socket结构

// 如果已经是无效的socket
if (s->type == SOCKET_TYPE_INVALID || s->id != id) {
result->id = id;
result->opaque = request->opaque;
result->ud = 0;
result->data = NULL;
return SOCKET_CLOSE;
}
struct socket_lock l;
socket_lock_init(s, &l); // 读写锁
if (!nomore_sending_data(s)) {
// 如果还有消息没发送则发送完
int type = send_buffer(ss,s,&l,result);
// type : -1 or SOCKET_WARNING or SOCKET_CLOSE, SOCKET_WARNING means nomore_sending_data
if (type != -1 && type != SOCKET_WARNING)
return type;
}
if (request->shutdown || nomore_sending_data(s)) {
// 没有消息了,可以关闭
force_close(ss,s,&l,result);
result->id = id;
result->opaque = request->opaque;
return SOCKET_CLOSE;
}
s->type = SOCKET_TYPE_HALFCLOSE;

return -1;
}

/*
Each socket has two write buffer list, high priority and low priority.

1. send high list as far as possible.
2. If high list is empty, try to send low list.
3. If low list head is uncomplete (send a part before), move the head of low list to empty high list (call raise_uncomplete) .
4. If two lists are both empty, turn off the event. (call check_close)
*/
static int
send_buffer_(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message *result) {
assert(!list_uncomplete(&s->low));
// step 1
if (send_list(ss,s,&s->high,l,result) == SOCKET_CLOSE) {
return SOCKET_CLOSE;
}
if (s->high.head == NULL) {
// step 2
if (s->low.head != NULL) {
if (send_list(ss,s,&s->low,l,result) == SOCKET_CLOSE) {
return SOCKET_CLOSE;
}
// step 3
if (list_uncomplete(&s->low)) {
raise_uncomplete(s);
return -1;
}
if (s->low.head)
return -1;
}
// step 4
assert(send_buffer_empty(s) && s->wb_size == 0);
sp_write(ss->event_fd, s->fd, s, false);

if (s->type == SOCKET_TYPE_HALFCLOSE) {
force_close(ss, s, l, result);
return SOCKET_CLOSE;
}
if(s->warn_size > 0){
s->warn_size = 0;
result->opaque = s->opaque;
result->id = s->id;
result->ud = 0;
result->data = NULL;
return SOCKET_WARNING;
}
}

return -1;
}

static void
sp_write(int efd, int sock, void *ud, bool enable) {
struct epoll_event ev;
ev.events = EPOLLIN | (enable ? EPOLLOUT : 0);
ev.data.ptr = ud;
epoll_ctl(efd, EPOLL_CTL_MOD, sock, &ev);
}

static void
force_close(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message *result) {
result->id = s->id; // 关联id
result->ud = 0;
result->data = NULL;
result->opaque = s->opaque; // 关联服务
if (s->type == SOCKET_TYPE_INVALID) {
return;
}
assert(s->type != SOCKET_TYPE_RESERVE);
free_wb_list(ss,&s->high); // 清空高优先级队列
free_wb_list(ss,&s->low); // 清空低优先级队列
if (s->type != SOCKET_TYPE_PACCEPT && s->type != SOCKET_TYPE_PLISTEN) {
sp_del(ss->event_fd, s->fd); //删除对fd的监听
}
socket_lock(l); // 上锁
if (s->type != SOCKET_TYPE_BIND) {
if (close(s->fd) < 0) { // 关闭fd
perror("close socket:");
}
}
s->type = SOCKET_TYPE_INVALID;
if (s->dw_buffer) {
free_buffer(ss, s->dw_buffer, s->dw_size); //释放buff
s->dw_buffer = NULL;
}
socket_unlock(l); // 解锁
}

send_socket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
/*
When send a package , we can assign the priority : PRIORITY_HIGH or PRIORITY_LOW

If socket buffer is empty, write to fd directly.
If write a part, append the rest part to high list. (Even priority is PRIORITY_LOW)
Else append package to high (PRIORITY_HIGH) or low (PRIORITY_LOW) list.
*/

struct request_send {
int id;
int sz;
char * buffer;
};

struct send_object {
void * buffer;
int sz;
void (*free_func)(void *);
};

static int
send_socket(struct socket_server *ss, struct request_send * request, struct socket_message *result, int priority, const uint8_t *udp_address) {
int id = request->id; // 获取id
struct socket * s = &ss->slot[HASH_ID(id)]; // 获取socket 结构
struct send_object so; // 声明发送对象
send_object_init(ss, &so, request->buffer, request->sz);
// 判断socket是否是无效
if (s->type == SOCKET_TYPE_INVALID || s->id != id
|| s->type == SOCKET_TYPE_HALFCLOSE
|| s->type == SOCKET_TYPE_PACCEPT) {
so.free_func(request->buffer);
return -1;
}
// 判断是否是listen类型socket
if (s->type == SOCKET_TYPE_PLISTEN || s->type == SOCKET_TYPE_LISTEN) {
fprintf(stderr, "socket-server: write to listen fd %d.\n", id);
so.free_func(request->buffer);
return -1;
}
// 如果没有数据,或者是连接中的socket
if (send_buffer_empty(s) && s->type == SOCKET_TYPE_CONNECTED) {
if (s->protocol == PROTOCOL_TCP) {
append_sendbuffer(ss, s, request); // add to high priority list, even priority == PRIORITY_LOW
} else {
// udp
if (udp_address == NULL) {
udp_address = s->p.udp_address;
}
union sockaddr_all sa;
socklen_t sasz = udp_socket_address(s, udp_address, &sa);
if (sasz == 0) {
// udp type mismatch, just drop it.
fprintf(stderr, "socket-server: udp socket (%d) type mistach.\n", id);
so.free_func(request->buffer);
return -1;
}
int n = sendto(s->fd, so.buffer, so.sz, 0, &sa.s, sasz);
if (n != so.sz) {
append_sendbuffer_udp(ss,s,priority,request,udp_address);
} else {
stat_write(ss,s,n);
so.free_func(request->buffer);
return -1;
}
}
sp_write(ss->event_fd, s->fd, s, true); // 监听写就绪事件
} else {
// 正常的可以直接发送的socket
if (s->protocol == PROTOCOL_TCP) {
// 如果是tcp,按照队列优先级添加
if (priority == PRIORITY_LOW) {
append_sendbuffer_low(ss, s, request);
} else {
append_sendbuffer(ss, s, request);
}
} else {
if (udp_address == NULL) {
udp_address = s->p.udp_address;
}
append_sendbuffer_udp(ss,s,priority,request,udp_address);
}
}
// 如果发送的包太大,则报一个警告
if (s->wb_size >= WARNING_SIZE && s->wb_size >= s->warn_size) {
s->warn_size = s->warn_size == 0 ? WARNING_SIZE *2 : s->warn_size*2;
result->opaque = s->opaque;
result->id = s->id;
result->ud = s->wb_size%1024 == 0 ? s->wb_size/1024 : s->wb_size/1024 + 1;
result->data = NULL;
return SOCKET_WARNING;
}
return -1;
}

// 发送对象初始化
static inline bool
send_object_init(struct socket_server *ss, struct send_object *so, void *object, int sz) {
if (sz < 0) {
so->buffer = ss->soi.buffer(object);
so->sz = ss->soi.size(object);
so->free_func = ss->soi.free;
return true;
} else {
so->buffer = object;
so->sz = sz;
so->free_func = FREE;
return false;
}
}

// 添加到发送队列
static inline void
append_sendbuffer(struct socket_server *ss, struct socket *s, struct request_send * request) {
struct write_buffer *buf = append_sendbuffer_(ss, &s->high, request, SIZEOF_TCPBUFFER);
s->wb_size += buf->sz;
}

static struct write_buffer *
append_sendbuffer_(struct socket_server *ss, struct wb_list *s, struct request_send * request, int size) {
struct write_buffer * buf = MALLOC(size);
struct send_object so;
buf->userobject = send_object_init(ss, &so, request->buffer, request->sz);
buf->ptr = (char*)so.buffer;
buf->sz = so.sz;
buf->buffer = request->buffer;
buf->next = NULL;
if (s->head == NULL) {
s->head = s->tail = buf;
} else {
assert(s->tail != NULL);
assert(s->tail->next == NULL);
s->tail->next = buf;
s->tail = buf;
}
return buf;
}

至此,别的线程发送到网络线程的消息已经大概分析完毕,下篇分析客户端发过来数据,网络线程怎么处理。

继续按照skynet.start中的顺序分析。分析skynet定时器。启动各个线程都会做类似的初始化。

定时器分析

这里要做一个前置理解,定时器,是只需要关心最近即将到来的任务,而不用关心距离现在比较远的任务,例如
1.你注册了100个定时任务,定时任务按照时间排序后
2.A任务2s后触发,B任务3h后触发,C任务h后触发……
3.当刷新时间的时候,需要去拿到即将到来的任务而需要关心之后的任务,这里就是只需要关心A任务,也就是你不需要去遍历所有任务列表拿到即将发生的任务,因为B之后的任务包括B,时间间隔太远了都不需要去关心。

所以定时器的设计需要:
1.查询快
2.做好排序
3.删除做完的任务之后不影响之前结构

常见的设计例如使用最小堆,时间轮,红黑树,而skynet使用的是时间轮。

时间轮的设计就类似于我们生活中的钟表,大概说就是,秒针走一圈,分针走一格,分针走一圈,时针走一格。
运用到程序中,大概就是,
1.我们只关心秒针的任务。
2.秒针的一圈任务执行完了之后,拿分针一格的任务分配到秒针各秒中。这样就我们每次都只会从秒针的任务中拿到需要执行的任务,和第1点一致。
3.分针的一圈任务执行完了之后,拿时针一格的任务分配到分针各分中。

先看定时器结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#define TIME_NEAR_SHIFT 8					// 临近时间转移量
#define TIME_NEAR (1 << TIME_NEAR_SHIFT) // 临近时间量 0x10000
#define TIME_LEVEL_SHIFT 6 // 别的时间等级转移量
#define TIME_LEVEL (1 << TIME_LEVEL_SHIFT) // 时间等级 0x1100
#define TIME_NEAR_MASK (TIME_NEAR-1) // 临近时间掩码 0x1111
#define TIME_LEVEL_MASK (TIME_LEVEL-1) // 别的时间等级掩码 0x1011

struct timer_event {
uint32_t handle; //即是设置定时器的来源,又是超时消息发送的目标
int session; //session,一个增ID,溢出了从1开始,所以不要设时间很长的timer
};

// 时间节点链表
struct timer_node {
struct timer_node *next; // 下一个节点地址
uint32_t expire; // 到期时间
};

// 定时器任务链表,
// 链表每个节点是一个时间节点链表
// 这里一定要清楚,定时器任务链表上每个节点是一个时间节点链表
// 即同一时间的任务链表
struct link_list {
struct timer_node head; // 头节点
struct timer_node *tail; // 尾节点
};

struct timer {
// (临近时间)8 + 4*(另外等级)6 = 32,刚好32位
struct link_list near[TIME_NEAR]; // 临近时间的定时器任务链表,也就是我上面说的秒针
struct link_list t[4][TIME_LEVEL]; // 剩下的四个等级的任务链表

struct spinlock lock; // 自旋锁
uint32_t time; // 服务器经过的的tick数,每10毫秒tick一次
uint32_t starttime; // 程序启动时间戳
uint64_t current; // 启动到现在的耗时,精度10毫秒级
uint64_t current_point; // 当前时间,精度10毫秒级
};

定时器初始化

skynet_start中调用 skynet_timer_init初始化,在我的《从头开始读skynet源码(1)》中有写。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// skynet_start中调用 skynet_timer_init初始化
void
skynet_timer_init(void) {
TI = timer_create_timer();
uint32_t current = 0;
systime(&TI->starttime, &current);
TI->current = current;
TI->current_point = gettime();
}

static struct timer *
timer_create_timer() {
struct timer *r=(struct timer *)skynet_malloc(sizeof(struct timer));
memset(r,0,sizeof(*r));

int i,j;

// 重置临近时间定时器任务链表
for (i=0;i<TIME_NEAR;i++) {
link_clear(&r->near[i]);
}

// 重置别的等级定时器任务链表
for (i=0;i<4;i++) {
for (j=0;j<TIME_LEVEL;j++) {
link_clear(&r->t[i][j]);
}
}

// 初始化按互斥锁
SPIN_INIT(r)

// 重置启动到现在的耗时
r->current = 0;

return r;
}

// 清空该任务节点
// 一次取出该任务节点的所有任务
static inline struct timer_node *
link_clear(struct link_list *list) {
struct timer_node * ret = list->head.next;
list->head.next = 0;
list->tail = &(list->head);

return ret;
}

启动定时器线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
static void
start(int thread) {
...
create_thread(&pid[1], thread_timer, m);
...
}

// 定时器线程
static void *
thread_timer(void *p) {
struct monitor * m = p; // 拿到服务器 monitor
skynet_initthread(THREAD_TIMER); // 设置线程共享变量,标记为定时器线程

// 线程死循环
for (;;) {
skynet_updatetime(); // 刷新skynet时间
skynet_socket_updatetime(); // 更新socket的时间
CHECK_ABORT // 检查是否还有skynet服务
wakeup(m,m->count-1); // 尝试唤醒
usleep(2500); // 休息2.5毫秒

// 如果收到退出信号,则退出循环
if (SIG) {
signal_hup();
SIG = 0;
}
}
// wakeup socket thread
skynet_socket_exit();
// wakeup all worker thread
pthread_mutex_lock(&m->mutex);
m->quit = 1;
pthread_cond_broadcast(&m->cond);
pthread_mutex_unlock(&m->mutex);
return NULL;
}

// 如果有未工作的线程,将其唤醒(虚假唤醒)
static void
wakeup(struct monitor *m, int busy) {
if (m->sleep >= m->count - busy) {
// signal sleep worker, "spurious wakeup" is harmless
pthread_cond_signal(&m->cond);
}
}

可以看到,定时器就是每2.5毫秒刷新一次,至于定时任务的处理,看到skynet_updatetime。这里就是skynet定时器的精华。

skynet_updatetime,定时器刷新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
void
skynet_updatetime(void) {
uint64_t cp = gettime(); // 获取当前时间

if(cp < TI->current_point) {
// 如果当前时间小于服务器时间则重置时间
skynet_error(NULL, "time diff error: change from %lld to %lld", cp, TI->current_point);
TI->current_point = cp;
} else if (cp != TI->current_point) {
// 如果当前时间大于等于服务器时间
// 获取相差时间
uint32_t diff = (uint32_t)(cp - TI->current_point);

TI->current_point = cp; // 更新服务器时间
TI->current += diff; // 更新启动到现在的耗时

// 处理相差时间的定时任务
int i;
for (i=0;i<diff;i++) {
timer_update(TI);
}
}
}

// 给一个时间差,处理相差时间的定时器任务
static void
timer_update(struct timer *T) {
SPIN_LOCK(T); // 加锁

// try to dispatch timeout 0 (rare condition)
timer_execute(T); // 处理任务

// shift time first, and then dispatch timer message
// 重新分配任务,也就是我说的秒针走了一圈,
// 把分针任务分配到秒针的概念
timer_shift(T);

// 再次尝试处理任务,类似分针走一格,重新分配秒针
// 重新分配后,第一秒的任务也触发了
timer_execute(T);

SPIN_UNLOCK(T); // 解锁
}

// 处理任务
static inline void
timer_execute(struct timer *T) {
// 当前时间与时间掩码做&运算
// 时间掩码 0x1111,&运算之后
// 获得当前时间需要处理的所有任务链表
// 我们只关心即将到来的所有任务
int idx = T->time & TIME_NEAR_MASK;

while (T->near[idx].head.next) {
// 如果该任务节点有任务,取出所有任务
struct timer_node *current = link_clear(&T->near[idx]);
SPIN_UNLOCK(T);
// dispatch_list don't need lock T
dispatch_list(current);
SPIN_LOCK(T);
}
}

// 分发时间任务
static inline void
dispatch_list(struct timer_node *current) {
// 循环处理这个任务节点的所有任务
do {
// 获取任务结构,这个是注册定时器任务的时候加入的
// 下面会有分析
struct timer_event * event = (struct timer_event *)(current+1);

// 给任务加上信息
struct skynet_message message;
message.source = 0;
message.session = event->session; //这个很重要,接收侧靠它来识别是哪个timer
message.data = NULL;
message.sz = (size_t)PTYPE_RESPONSE << MESSAGE_TYPE_SHIFT;

// 将任务放入消息队列
skynet_context_push(event->handle, &message);

// 获取下一个任务
struct timer_node * temp = current;
current=current->next;
skynet_free(temp);
} while (current);
}

// skynet服务注册定时器任务
int
skynet_timeout(uint32_t handle, int time, int session) {
if (time <= 0) {
// 立刻触发
struct skynet_message message;
message.source = 0;
message.session = session;
message.data = NULL;
message.sz = (size_t)PTYPE_RESPONSE << MESSAGE_TYPE_SHIFT;

if (skynet_context_push(handle, &message)) {
return -1;
}
} else {
// 稍后触发
struct timer_event event;
event.handle = handle; //记录是哪个skynet服务
event.session = session;
timer_add(TI, &event, sizeof(event), time); //加入定时器任务链表
}

return session;
}

//加入定时器任务链表
static void
timer_add(struct timer *T,void *arg,size_t sz,int time) {
struct timer_node *node = (struct timer_node *)skynet_malloc(sizeof(*node)+sz);
memcpy(node+1,arg,sz);

SPIN_LOCK(T);

node->expire=time+T->time; // 记录触发时间
add_node(T,node); // 加入到合适的节点

SPIN_UNLOCK(T);
}

// 加入到合适的节点
static void
add_node(struct timer *T,struct timer_node *node) {
uint32_t time=node->expire; // 触发时间
uint32_t current_time=T->time; // 启动到现在经过的tick

if ((time|TIME_NEAR_MASK)==(current_time|TIME_NEAR_MASK)) {
// 如果是临近发生的,则丢入临近发生链表的对应节点
// 触发时间 & 0x1111,获得触发时间对应的节点
link(&T->near[time&TIME_NEAR_MASK],node);
} else {
// 如果不是临近发生的,则丢入对应的链表
int i;
uint32_t mask=TIME_NEAR << TIME_LEVEL_SHIFT;
// 在接下来的三等级中寻找
for (i=0;i<3;i++) {
if ((time|(mask-1))==(current_time|(mask-1))) {
break;
}
mask <<= TIME_LEVEL_SHIFT;
}
// 如果不是前面三个等级,则是第四个等级
link(&T->t[i][((time>>(TIME_NEAR_SHIFT + i*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node);
}
}

// 将时间节点加入
tatic inline void
link(struct link_list *list,struct timer_node *node) {
list->tail->next = node;
list->tail = node;
node->next=0;
}

skynet_socket_updatetime 网络线程刷新时间

1
2
3
4
5
6
7
8
9
void
skynet_socket_updatetime() {
socket_server_updatetime(SOCKET_SERVER, skynet_now());
}

void
socket_server_updatetime(struct socket_server *ss, uint64_t time) {
ss->time = time;
}

至此,定时器方面的分析,大概完毕,下篇分享thread_socket,skynet网络线程。

monitor

monitor分为服务器的monitor 和 skynet_monitor,整个服务器只有一个服务器monitor,每条工作线程绑定一个skynet_monitor,上面在声明了若干条线程数量后,先做的就是创建服务器monitor。

服务器 monitor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 可以把服务退出的消息从框架层抛出来,让上层逻辑可以感知到
struct monitor {
int count; // 多少条工作线程
struct skynet_monitor ** m; // skynet_monitor
pthread_cond_t cond; // 条件变量
pthread_mutex_t mutex; // 互斥锁
int sleep;
int quit; // 服务器是否退出的标记
};

start(int thread) {
...
struct monitor *m = skynet_malloc(sizeof(*m));
memset(m, 0, sizeof(*m));
m->count = thread;
m->sleep = 0;

m->m = skynet_malloc(thread * sizeof(struct skynet_monitor *));
int i;
for (i=0;i<thread;i++) {
m->m[i] = skynet_monitor_new();
}
if (pthread_mutex_init(&m->mutex, NULL)) {
fprintf(stderr, "Init mutex error");
exit(1);
}
if (pthread_cond_init(&m->cond, NULL)) {
fprintf(stderr, "Init cond error");
exit(1);
}
create_thread(&pid[0], thread_monitor, m);
...
}

工作线程监视 skynet_monitor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// skynet_monitor.h
struct skynet_monitor * skynet_monitor_new();//新建一个监视器
void skynet_monitor_delete(struct skynet_monitor *);//删除一个监视器
void skynet_monitor_trigger(struct skynet_monitor *, uint32_t source, uint32_t destination);//通知监视器开始
void skynet_monitor_check(struct skynet_monitor *);//检查监视器是否陷入死循环

// skynet_monitor.c
// 主要用于监测skynet服务在处理消息时是否陷入死循环
struct skynet_monitor {
int version; // 当前版本
int check_version; // 检查版本
uint32_t source; // 消息源服务
uint32_t destination; // 消息目标服务
};

truct skynet_monitor *
skynet_monitor_new() {
struct skynet_monitor * ret = skynet_malloc(sizeof(*ret));
memset(ret, 0, sizeof(*ret));
return ret;
}

thread_monitor 线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// 如果没有正在运行的skynet_context(skynet服务)则break
#define CHECK_ABORT if (skynet_context_total()==0) break;

// skynet_server.c
// 设置统一线程内的共享数据
// 线程内部的各个函数调用都能访问、但其它线程不能访问的变量
// 处理了进程内静态变量全部线程共享的问题
void
skynet_initthread(int m) {
uintptr_t v = (uint32_t)(-m);
pthread_setspecific(G_NODE.handle_key, (void *)v);
}

// skynet_monitor.c
// 检查服务是否陷入死循环
void
skynet_monitor_check(struct skynet_monitor *sm) {
if (sm->version == sm->check_version) {
// 这说明一个消息处理了5到10秒还没有返回,
// 导致sm->version一直没有增长。下一次check发现没有增长,
// 就认为它可能发生死循环了。
// 处理也只是加一个标记,并打印一条错误日志,没有再做其他处理。
if (sm->destination) {
skynet_context_endless(sm->destination);
skynet_error(NULL, "A message from [ :%08x ] to [ :%08x ] maybe in an endless loop (version = %d)", sm->source , sm->destination, sm->version);
}
} else {
// 如果版本不一致,更新检查版本
sm->check_version = sm->version;
}
}

static void *
thread_monitor(void *p) {
struct monitor * m = p; //拿到服务器monitor
int i;
int n = m->count; // 拿到工作线程数

// 设置统一线程内的共享数据THREAD_MONITOR,标记为monitor线程
skynet_initthread(THREAD_MONITOR);

// 线程死循环
for (;;) {
// 定义在上面,检查是否还有skynet服务运行
CHECK_ABORT

// 检查每一个工作线程是否陷入死循环
for (i=0;i<n;i++) {
skynet_monitor_check(m->m[i]);
}
// 每秒检查一次是否还有服务在运行
// 延迟5s后再进入上面的检查工作线程
for (i=0;i<5;i++) {
CHECK_ABORT
sleep(1);
}
}

return NULL;
}

至此,skynet.star大概做了什么,monitor部分功能都做了大概分析。下篇顺着skynet.star分析,thread_timer(skynet定时器)