继续按照skynet.start中的顺序分析。分析skynet定时器。启动各个线程都会做类似的初始化。
定时器分析
这里要做一个前置理解,定时器,是只需要关心最近即将到来的任务,而不用关心距离现在比较远的任务,例如
1.你注册了100个定时任务,定时任务按照时间排序后
2.A任务2s后触发,B任务3h后触发,C任务h后触发……
3.当刷新时间的时候,需要去拿到即将到来的任务而需要关心之后的任务,这里就是只需要关心A任务,也就是你不需要去遍历所有任务列表拿到即将发生的任务,因为B之后的任务包括B,时间间隔太远了都不需要去关心。
所以定时器的设计需要:
1.查询快
2.做好排序
3.删除做完的任务之后不影响之前结构
常见的设计例如使用最小堆,时间轮,红黑树,而skynet使用的是时间轮。
时间轮的设计就类似于我们生活中的钟表,大概说就是,秒针走一圈,分针走一格,分针走一圈,时针走一格。
运用到程序中,大概就是,
1.我们只关心秒针的任务。
2.秒针的一圈任务执行完了之后,拿分针一格的任务分配到秒针各秒中。这样就我们每次都只会从秒针的任务中拿到需要执行的任务,和第1点一致。
3.分针的一圈任务执行完了之后,拿时针一格的任务分配到分针各分中。
先看定时器结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| #define TIME_NEAR_SHIFT 8 #define TIME_NEAR (1 << TIME_NEAR_SHIFT) #define TIME_LEVEL_SHIFT 6 #define TIME_LEVEL (1 << TIME_LEVEL_SHIFT) #define TIME_NEAR_MASK (TIME_NEAR-1) #define TIME_LEVEL_MASK (TIME_LEVEL-1)
struct timer_event { uint32_t handle; int session; };
struct timer_node { struct timer_node *next; uint32_t expire; };
struct link_list { struct timer_node head; struct timer_node *tail; };
struct timer { struct link_list near[TIME_NEAR]; struct link_list t[4][TIME_LEVEL];
struct spinlock lock; uint32_t time; uint32_t starttime; uint64_t current; uint64_t current_point; };
|
定时器初始化
skynet_start中调用 skynet_timer_init初始化,在我的《从头开始读skynet源码(1)》中有写。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| void skynet_timer_init(void) { TI = timer_create_timer(); uint32_t current = 0; systime(&TI->starttime, ¤t); TI->current = current; TI->current_point = gettime(); }
static struct timer * timer_create_timer() { struct timer *r=(struct timer *)skynet_malloc(sizeof(struct timer)); memset(r,0,sizeof(*r));
int i,j;
for (i=0;i<TIME_NEAR;i++) { link_clear(&r->near[i]); }
for (i=0;i<4;i++) { for (j=0;j<TIME_LEVEL;j++) { link_clear(&r->t[i][j]); } }
SPIN_INIT(r)
r->current = 0;
return r; }
static inline struct timer_node * link_clear(struct link_list *list) { struct timer_node * ret = list->head.next; list->head.next = 0; list->tail = &(list->head);
return ret; }
|
启动定时器线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| static void start(int thread) { ... create_thread(&pid[1], thread_timer, m); ... }
static void * thread_timer(void *p) { struct monitor * m = p; skynet_initthread(THREAD_TIMER); for (;;) { skynet_updatetime(); skynet_socket_updatetime(); CHECK_ABORT wakeup(m,m->count-1); usleep(2500); if (SIG) { signal_hup(); SIG = 0; } } skynet_socket_exit(); pthread_mutex_lock(&m->mutex); m->quit = 1; pthread_cond_broadcast(&m->cond); pthread_mutex_unlock(&m->mutex); return NULL; }
static void wakeup(struct monitor *m, int busy) { if (m->sleep >= m->count - busy) { pthread_cond_signal(&m->cond); } }
|
可以看到,定时器就是每2.5毫秒刷新一次,至于定时任务的处理,看到skynet_updatetime。这里就是skynet定时器的精华。
skynet_updatetime,定时器刷新
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
| void skynet_updatetime(void) { uint64_t cp = gettime(); if(cp < TI->current_point) { skynet_error(NULL, "time diff error: change from %lld to %lld", cp, TI->current_point); TI->current_point = cp; } else if (cp != TI->current_point) { uint32_t diff = (uint32_t)(cp - TI->current_point);
TI->current_point = cp; TI->current += diff; int i; for (i=0;i<diff;i++) { timer_update(TI); } } }
static void timer_update(struct timer *T) { SPIN_LOCK(T);
timer_execute(T);
timer_shift(T);
timer_execute(T);
SPIN_UNLOCK(T); }
static inline void timer_execute(struct timer *T) { int idx = T->time & TIME_NEAR_MASK; while (T->near[idx].head.next) { struct timer_node *current = link_clear(&T->near[idx]); SPIN_UNLOCK(T); dispatch_list(current); SPIN_LOCK(T); } }
static inline void dispatch_list(struct timer_node *current) { do { struct timer_event * event = (struct timer_event *)(current+1);
struct skynet_message message; message.source = 0; message.session = event->session; message.data = NULL; message.sz = (size_t)PTYPE_RESPONSE << MESSAGE_TYPE_SHIFT;
skynet_context_push(event->handle, &message); struct timer_node * temp = current; current=current->next; skynet_free(temp); } while (current); }
int skynet_timeout(uint32_t handle, int time, int session) { if (time <= 0) { struct skynet_message message; message.source = 0; message.session = session; message.data = NULL; message.sz = (size_t)PTYPE_RESPONSE << MESSAGE_TYPE_SHIFT;
if (skynet_context_push(handle, &message)) { return -1; } } else { struct timer_event event; event.handle = handle; event.session = session; timer_add(TI, &event, sizeof(event), time); }
return session; }
static void timer_add(struct timer *T,void *arg,size_t sz,int time) { struct timer_node *node = (struct timer_node *)skynet_malloc(sizeof(*node)+sz); memcpy(node+1,arg,sz);
SPIN_LOCK(T); node->expire=time+T->time; add_node(T,node);
SPIN_UNLOCK(T); }
static void add_node(struct timer *T,struct timer_node *node) { uint32_t time=node->expire; uint32_t current_time=T->time; if ((time|TIME_NEAR_MASK)==(current_time|TIME_NEAR_MASK)) { link(&T->near[time&TIME_NEAR_MASK],node); } else { int i; uint32_t mask=TIME_NEAR << TIME_LEVEL_SHIFT; for (i=0;i<3;i++) { if ((time|(mask-1))==(current_time|(mask-1))) { break; } mask <<= TIME_LEVEL_SHIFT; } link(&T->t[i][((time>>(TIME_NEAR_SHIFT + i*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node); } }
tatic inline void link(struct link_list *list,struct timer_node *node) { list->tail->next = node; list->tail = node; node->next=0; }
|
skynet_socket_updatetime 网络线程刷新时间
1 2 3 4 5 6 7 8 9
| void skynet_socket_updatetime() { socket_server_updatetime(SOCKET_SERVER, skynet_now()); }
void socket_server_updatetime(struct socket_server *ss, uint64_t time) { ss->time = time; }
|
至此,定时器方面的分析,大概完毕,下篇分享thread_socket,skynet网络线程。