0%

俗事缠身,加上自己有点懒,skynet分析的事情拖了很久,之后尽快分析完。其实skynet是一个很优秀的框架,值得细读,多读(上加公司的老大这么对我说的,现在我也觉得是这样的)。skynet.star东西比较多,分四篇分析。下面直接进入正题。

启动服务器skynet_start()

start传入配置的线程数量,启动服务器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void 
skynet_start(struct skynet_config * config) {
...
// 调用start传入配置线程数量
// 这个线程数量就是工作线程数量
start(config->thread);

// harbor_exit may call socket send, so it should exit before socket_free
skynet_harbor_exit();
skynet_socket_free();
if (config->daemon) {
daemon_exit(config->daemon);
}
}

下面进入到start的逻辑,在同一个文件,先看注释,了解大概流程。之后我们一个结构一个结构分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
static void
start(int thread) {
// 声明传入的线程数量thread+3,之后会看到为什么+3
pthread_t pid[thread+3];

// 创建服务器monitor
struct monitor *m = skynet_malloc(sizeof(*m));
memset(m, 0, sizeof(*m));
m->count = thread; // 记录线程数量
m->sleep = 0;

// 创建对应数量的skynet_monitor 并初始化
m->m = skynet_malloc(thread * sizeof(struct skynet_monitor *));
int i;
for (i=0;i<thread;i++) {
m->m[i] = skynet_monitor_new();
}
// 初始化互斥锁
if (pthread_mutex_init(&m->mutex, NULL)) {
fprintf(stderr, "Init mutex error");
exit(1);
}
// 初始化条件变量
if (pthread_cond_init(&m->cond, NULL)) {
fprintf(stderr, "Init cond error");
exit(1);
}

// 这里就是额外+的三个线程
// 一条监视者线程,一条定时器线程,一条网络线程
create_thread(&pid[0], thread_monitor, m);
create_thread(&pid[1], thread_timer, m);
create_thread(&pid[2], thread_socket, m);

// 这里是设程的比重,关于消息队列的调度
static int weight[] = {
-1, -1, -1, -1, 0, 0, 0, 0,
1, 1, 1, 1, 1, 1, 1, 1,
2, 2, 2, 2, 2, 2, 2, 2,
3, 3, 3, 3, 3, 3, 3, 3, };
// 声明对应数量的工作线程环境
struct worker_parm wp[thread];

// 初始化工作线程
for (i=0;i<thread;i++) {
wp[i].m = m; // 关联服务器monitor
wp[i].id = i; // 关联线程id

// 配置线程比重,这个比重有什么用之后分析
// 如果配置了超过8条工作线程,会走else逻辑,比重都是0
if (i < sizeof(weight)/sizeof(weight[0])) {
wp[i].weight= weight[i];
} else {
wp[i].weight = 0;
}

// 创建工作线程,这里pid[i+3],与上面对应
create_thread(&pid[i+3], thread_worker, &wp[i]);
}

// 等待监视者线程,定时器线程,网络线程结束
// 服务器正常运行,是不会结束的
for (i=0;i<thread+3;i++) {
pthread_join(pid[i], NULL);
}

// 如果结束了,做清理工作
free_monitor(m);
}

经过上面可以知道,其实服务器模型就是线程池模型,启动了一条监视者线程,一条定时器线程,一条网络线程和配置的工作线程。要想知道线程是怎么工作的,要对上面各个线程的工作函数进行逐一分析即可。下篇分析monitor。

1
2
之前在上一家公司用的都是svn,现在使用gitlab。最近又在测试cicd,反复查询了一些命令,
也敲了一些命令,自己也记录一下,持续更新。

在这里插入图片描述

1
2
3
4
5
6
7
8
9
10
11
12
git clone <addr> 					// 拷贝远程仓库到本地
git status // 查看仓库当前的状态,显示有变更的文件(有的情况下文件增减看不到)
git diff <file> // 比较文件的不同,即暂存区和工作区的差异。
git add . // 添加文件到暂存区
git add -f <file> // 在文件增加没有提示时,手动添加文件
git commit -m "xxx" // 将暂存区内容添加到仓库中
git push origin <branch> // 提交本地仓库到指定分支
git pull origin <branch> // 拉取远程分支更新本地仓库
git tag <v1.x.x> // 给本地打标签
git tag -d <v1.x.x> // 删除本地标签
git push origin <tag> // 提交本地标签到远程仓库
git push origin :refs/tags/<tag> // 删除远程仓库tag

回家的路上看了一下cpu的结构,想起了前两天和领导吃饭时聊到的问题,为什么数组遍历比链表快,明明都知道地址了。

其实要理解这个问题,从cpu的结构和内存角度来理解就很清晰了。

读取速度:缓存>内存(剧吐多少倍差距,得看什么cpu,大概都有100倍左右)。

** **cpu读取数据是按照缓存行读取到缓存的,简单来说就是cpu会把需要的数据加载到缓存中,查找数据时,会先从缓存找,找不到再到内存找。

而数组作为连续内存,cpu缓存会把一片连续的内存空间读入,这样连续内存的数组会更易于整块读取到缓存中,当进行遍历时,直接命中缓存。而链表是跳跃式的地址,很轻易就会跳出缓存,跑到内存中去查找数据。所以会慢很多。

看下面的程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#include <time.h>
#include <stdio.h>

void show_arr_addr(){
long arr[2][8];
for(int i=0; i<2; i++)
{
printf("row addr:%X\n", arr[i]);
printf("column addr: ");
for(int j=0; j<8; j++)
{
printf("%X, ", &arr[i][j]);
}
printf("\n");
}
}

int main() {
double start = 0, finish = 0;
long arr[100000][8];

show_arr_addr();
printf("---------------------------\n");

// 先遍历行,在遍历列
start=(double) clock();
for(int i=0; i<100000; i++)
for(int j=0; j<8; j++)
arr[i][j] = 1;

finish=(double)clock();
printf("use time:%.5f ms\n",finish-start);

// 先遍历列,再遍历行
start=(double) clock();
for(int j=0; j<8; j++)
for(int i=0; i<100000; i++)
arr[i][j] = 1;

finish=(double)clock();
printf("use time:%.5f ms\n",finish-start);
}

我是64位的cpu,所以cpu读取一次是按照64根总线,也就是64位。我构造了一个二维数组,每行是64位,可以看数组的地址,是连续的。

下面先进行遍历,两种遍历,先遍历列会跳出上次读取的缓存行。所以会比先遍历行慢。链表的道理也是一样,如果地址是存在缓存之外的,就会花费更多的时间。

背景

我不是从头开始搭建,而是在已有部分条件的情况下进行,因为是我自己一个人负责集群中,一个服务器的重写,领导就叫我在gitlab上创建仓库,然后自己提交代码。我之前是没用过gitlab的,上一家公司的工作用的是svn,所以对我来说,一切都是未知。我的项目是golang写的。
我的目的是:把项目打包成镜像—->上传到镜像仓库—->部署到k8s

先建立仓库

登录gitlab—->选择New project,会到一下页面,按照自己的情况填写
在这里插入图片描述

这里有一点需要注意,当你创建完项目之后,会看到一些指引,例如叫你上传一个readme.md。我刚开始是没有权限的,所以什么也弄不了,直到你获得权限,你就可以操作仓库了。

获得权限后,先拉仓库到本地

在这里插入图片描述

进入你的工程,选择clone,然后选择自己想要的方式,使用git clone拉取仓库到你的本地。拉取之后,就可以用git add . —->git commit -m “xxxx” —->git push origin master提交到仓库。
注意:一般git 提交的时候,需要忽略一些文件,只需要在项目的目录添加一个.gitignore即可。
我提交完的文件大概如下,其中只需要知道两个
在这里插入图片描述
.gitignore 是用来忽略一些不需要提交的文件
.gitlab-ci.yaml 是用来自动部署的

先讲解一下gitlab cicd

其实这个东西不难,不知道网上为什么那么多解释,也不能写的都是不易于新手理解的一长串七七八八的概念。cicd需要两个东西:.gitlab-ci.yaml文件、gitlab-runner

.gitlab-ci.yaml
其实就是描述了你需要的自动部署过程,例如,你需要打包docker镜像,然后上传,那你就需要在gitlab-ci,yaml中使用它的语法去描述这个过程即可。
gitlab-runner
这个东西更易于理解,不知道网上哪些什么鬼解释。你既然写了自动部署脚本,那就需要有东西去执行你写的自动部署脚本,而gitlab-runner就是这个东西(类似于一个k8s中的pod,如果不对欢迎指出)。

其实这两个就是很简单的东西,反正我理解完,说出来就这么些。gitlab-runner跑的地方,就是你的gitlab项目地址,也就是说,你写的脚本就相当于在一台linux机器中,有一个你的项目的文件夹,你就在这个文件夹下写脚本(这是我自己的理解,我也的确是这样做的,如果有什么不对的地方欢迎讨论),你完全可以在你的.gitlab-ci.yaml中写脚本用ls,pwd的命令看就知道了。

部署阶段

编写.gitlab-ci.yaml文件

一开始我也不知道,上来就是直接用了个golang的cicd的demo(这个东西不用在gitlab添加cicd的时候就可以选择demo),执行之后,发现其实.gitlab-ci.yaml写的就是shell脚本(当时不知道,其实这个是和gitlab-runner相关的,后面再说)。这就简单了啊。
关于语法,这里不做说明
由于是项目,这里只能贴出部分
整个脚本分为三个阶段

1
2
3
4
stages:
- build
- docker_publish
- deploy

build代码上传后,直接写脚本go build,例如我的文件中这样写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
build:
image: golang:1.15.6
stage: build
before_script:
- mkdir -p $GOPATH/src/$(dirname $REPO_NAME)
- ln -svf $CI_PROJECT_DIR $GOPATH/src/$REPO_NAME
- cd $GOPATH/src/$REPO_NAME
script:
- pwd
- ls
- cd realTimeMsg
- go mod download
- go build -o $CI_PROJECT_DIR/RtmServer
artifacts:
paths:
- RtmServer
expire_in: 2 mos

这个很好理解,主要看script,一看便知。

docker_pulish将输出打包成镜像,这个就需要在docker中使用docker,要使用services如下。

这个构建也不难,在gitlab中上传你写好的Dockerfile后直接build即可。

1
2
3
4
5
6
7
8
9
docker_publish:
image: docker:latest
services:
- docker:dind
script:
...
- docker build -f Dockerfile --pull -t xxx
- docker image ls
- docker push xxx

这里需要注意一些,在.gitlab-ci.yaml文件中,需要定义的环境变量可以在setting—->cicd里面设置
在这里插入图片描述

deploy部署,我碰到的最大的问题就在这里

我的部署脚本中,就是无法dev的k8s中部署,我使用脚本kubectl cluster-info,发现我的脚本跑在的地方根本不是我的dev k8s,所以肯定无法部署。这种情况下我就去查了。
在这里插入图片描述
出现这种问题的原因是:gitlab-runner没有在我想要的环境中跑。所以无法关联到集群。
我在项目的部署的时候,也没有创建gitlab-runner,但是我的部署脚本的确跑起来了。
经过查询,原来是我参与的项目中有默认的共享的gitlab-runner,直接用这个跑了。

我首先尝试了把我的项目添加到k8s,也就是这个按钮,原来是add,我添加之后变成这个
在这里插入图片描述
添加方法在这里:https://segmentfault.com/a/1190000020947651,我是参考了这个文章。
但是并没有解决我的问题。

最后我在dev 的k8s环境中,给我的项目注册了一个gitlab-runner,我的开发环境已经存在gitlab-runner,如果你的不存在,就安装一下。
之后这样搞先打开setting—->cicd—->runners
在这里插入图片描述
然后使用gitlab-runner register
在这里插入图片描述
输入之前runner那张图的url
输入之前runner那张图的token
出入描述
输入tag,这个很重要,.gitlab-ci.yaml脚本通过tag关联runner
输入选择的执行方式,我选了shell

注册完之后会发现setting—->cicd—->runners多了一个runner
在这里插入图片描述
到这里还是不能用,就算在.gitlab-ci.yaml文件中关联了这个tag。这样关联,加上tags即可

1
2
3
4
5
deploy:
image: dtzar/helm-kubectl:2.9.1
stage: deploy
tags:
- goRtm

因为这个runner并不能跑起来,如果能够运行,则是前面是绿色的,tag改过了,和上图对应的不一样,说明一下。
在这里插入图片描述
还要调用gitlab-runner run,调用之后就变成绿色的就可以用了
在这里插入图片描述
在这里插入图片描述
做完这些,我就可以在提交代码后,自动部署到我想要的k8s中了。

因为文章涉及到公司项目,很多东西不能交代清楚。大概只能这样写了。

项目中redis直接是使用了aws的redis服务,redis是集群,也就是golang连接redis集群碰到的坑,如下

首先我使用的是redisgo,因为看着api比较方便,比较像redis-cli,而且aws也推荐。然后一开始连接也没事,我部署了不下10遍,然而有一天部署时。服务器在操作redis时忽然报错。“(error) MOVED xxxx xxx.xxx.xxx.xxx:xxxx”,经过查询得知这是由于redis没有选择连接集群导致的,再次查询,发现aws官方推荐了两个redis库,redisgo和go-redis两个库,redisgo是不支持集群的(太坑了),而且我部署了那么多次都没问题。可能一直都连到master上了,忽然就连到slave就报错了。还好发现的早。

go-redis/redis连接数据库的时候,官方的demo给的就是一堆端口,

1
2
3
4
5
6
7
8
9
import "github.com/go-redis/redis/v8"

rdb := redis.NewClusterClient(&redis.ClusterOptions{
Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},

// To route commands by latency or randomly, enable one of the following.
//RouteByLatency: true,
//RouteRandomly: true,
})

这个是可以填好多地址的,只要把自己的地址填入string的切片即可,如果只有一个地址,就只填一个(项目中redis集群可能对外只暴露一个地址)

1
2
3
clusterClient = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: []string{"your addr1"},
})

redis.NewClusterClient这个接口默认是连接master的,如果只想连接slave,则通过参数控制,具体查看接口的说明即可。

** ** 最后切换了包之后,成功解决了redis集群连接的问题。

在项目中使用time包时,睡眠时间时想要通过配置文件获取,然而整数与时间相乘又报错。如下

查询time包发现

1
2
3
4
// A Duration represents the elapsed time between two instants
// as an int64 nanosecond count. The representation limits the
// largest representable duration to approximately 290 years.
type Duration int64

即,其实就是一个int64的数。最后先把需要的数字进行转换,这样解决即可

1
err = client.Set(ctx, key, data, time.Duration(ex)*time.Second).Err()

最近改写服务器,集群中服务器是nodejs写的,把其中一个服务器改成golang写的,碰到了nodejs aes192加密,再golang中解密碰到的坑,如下。

在nodejs中是这样加密的

1
2
3
4
5
6
const encrypt = (toEncrypt) => {
const cipher = crypto.createCipher('aes192', cryptoKey);
let encrypted = cipher.update(toEncrypt, 'utf8', 'hex');
encrypted += cipher.final('hex');
return encrypted;
};

发现golang中根本不知道aes192对应的到底是aes192中的那种加密模式,经过查询接口已经废弃

使用新的接口,原来的接口文档已经找不到了,刚开始顺着这个方法一直找,首先AES 有五种加密模式:电码本模式(Electronic Codebook Book (ECB))、密码分组链接模式(Cipher Block Chaining (CBC))、计算器模式(Counter (CTR))、密码反馈模式(Cipher FeedBack (CFB))和输出反馈模式(Output FeedBack (OFB)),要查到具体nodejs中使用的到底是哪个方法,查看新的接口说明,这里是重点一,通过接口说明,一步一步找出具体调用的是五个模式中的哪个

然后查看用的到底是什么,可以看到,用的是DES-192-CBC

然后查看对应的golang的解密方法,发现,必须要给定密钥,和向量。而原来的nodejs接口,只需要给一个字符串就可以了,再次查询资料,最后在第三方的资料中,找到

crypto.createCipher(algorithm, password):用给定的算法和密钥,创建并返回一个Cipher加密算法的对象。参数:algorithm算法是依赖OpenSSL库支持的算法, 例如: ‘aes192’算法等,password是用来派生key和iv的,它必须是一个 ‘binary’二进制格式的字符串或者是一个Buffer可以看到给定password参数,是用来生成key和iv的,具体怎么生成的,无从得知,或者说,要搞懂这个太麻烦了。

所以最后得出的结论是:nodejs使用原来的接口根本无法在golang中解密,所以必须修改nodejs的接口为新的接口。这里是重点二

将nodejs的接口修改为

1
2
3
4
5
6
encrypt(toEncrypt) {
const cipher = crypto.createCipheriv('aes192', this.cryptoKey, this.cryptoVi);
let encrypted = cipher.update(toEncrypt, 'utf8', 'hex');
encrypted += cipher.final('hex');
return encrypted;
}

然后在golang中用同样的密钥和向量解密,这个解密方法不难,随便搜索即可找到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//对密文删除填充
func unPadDing(cipherText []byte) []byte {
//取出密文最后一个字节end
end := cipherText[len(cipherText)-1]
//删除填充
cipherText = cipherText[:len(cipherText)-int(end)]
return cipherText
}

//AEC解密(CBC模式)
func aesCbcDecrypt(cipherText []byte) string {
//指定解密算法,返回一个AES算法的Block接口对象
block, err := aes.NewCipher(cryp2Key)
if err != nil {
panic(err)
}
//指定分组模式,返回一个BlockMode接口对象
blockMode := cipher.NewCBCDecrypter(block, iv)
//解密
plainText := make([]byte, len(cipherText))
blockMode.CryptBlocks(plainText, cipherText)
//删除填充
plainText = unPadDing(plainText)

return string(plainText)
}

这样就可以成功解密了。

最后:最坑的就是nodejs的接口,找了好久都找不到解密方法,原来要用新接口,而且必须找到接口到底对应哪个加密方法才能在golang解密。

其实我已经安装了很久了, 久到我已经忘记怎么安装了, 今天在k8s中调试时, 要重新装golang, 上网查了一下, 发现太多文章都是不太方便的, 例如现在windows下载好包, 再考到linux下解压, 有这么麻烦么, 下面直接上流程

1.先下载golang的包

这里网上好多文章都是在windows下搜索, 下载好拷贝过去, 其实不用这么干, 在linux中直接下载就好了

我是debian 64位的系统, 工作目录是/usr/local. 这样下

1
2
3
4
5
6
// 如果你看到这篇文章,可能已经过了很久,go的版本自己看着修改
wget https://storage.googleapis.com/golang/go1.15.7.linux-amd64.tar.gz

// 如果没有wget, 先进行以下步骤, 然后再再下载golang
// apt-get update
// apt-get install wget

2.解压golang

1
2
// 你下载再哪里就在哪里解压
tar -C /usr/local -xzf go1.15.7.linux-amd64.tar.gz

3.设置环境变量

1
2
3
4
5
6
export PATH=$PATH:/usr/local/go/bin

//这里写你自己的工作目录
export GOPATH=/usr/local/go_project

export GOROOT=/usr/local/go

最后用go version 查看是否安装成功

打完收工.

在项目中,需要使用golang写http服务器并部署在k8s中,在server deployment.yaml中存在livenessProbe(存活探针)以及readinessProbe(就绪探针),这两个我都是写了向服务器发送一个http请求,如果服务器收到并回复,则表示成功,但是在使用过程中,日志一直输出,这些会干扰日志的查看。

经过查询与测试,最后修复,做此纪录,代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func healthcheck(w http.ResponseWriter, r *http.Request) {
//fmt.Fprintf(w, "i m live") 原来就是因为多了这行,然后一直输出,屏蔽了即可
w.WriteHeader(200)
}

func main() {
router := http.NewServeMux()
router.HandleFunc("/healthcheck", healthcheck)
port := fmt.Sprint(":", *config.Conf.HttpPort)
srv := &http.Server{
Addr: port,
Handler: router,
}

logger.Info("[HTTP] http server listen", port)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.Error("[HTTP] server listen on", port, ", err:", err)
os.Exit(1)
}
}

在最近的项目中,使用了go 搭建了web服务器,所以”优雅的关闭服务器”,经过查资料,发现go在1.8之后,http包已经有Shutdown()方法,但是使用起来还是有一些要注意的地方。

如果对go很熟悉的,对这些包很熟悉的人,肯定不会碰到这个问题。只怪我太菜。。。碰到了坑。下面直接上代码。

查询资料的时候,在网上看到这样的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
func gracefullShutdown(server *http.Server, logger *log.Logger, quit <-chan os.Signal, done chan<- bool) {
<-quit
logger.Println("Server is shutting down...")

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

server.SetKeepAlivesEnabled(false)
if err := server.Shutdown(ctx); err != nil {
logger.Fatalf("Could not gracefully shutdown the server: %v\n", err)
}
close(done)
}

像这种写法,是启动了别的goroutin去执行gracefullShoudown,所以需要额外的一个done channel来阻塞主线程。我们的http服务器必然会调用这个ListenAndServer(如下),当调用了gracefullShutdown中server.Shutdown(ctx),ListenAndServer会直接返回,而主线程直接结束了的话,别的线程也就会跟着一起结束。也就没有了等待服务器关闭这么一说法了。

1
2
3
4
5
6
7
8
9
10
11
router := http.NewServeMux()
router.HandleFunc("/healthcheck", healthcheck)
port := fmt.Sprint(":8080")
srv := &http.Server{
Addr: port,
Handler: router,
}

if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.Error("[HTTP] server listen on", port, ", err:", err)
}

** 注意:server.ListenAndServe() 方法在 Shutdown 时会立刻返回,Shutdown 方法会阻塞至所有连接闲置或 context 完成,所以 Shutdown 的方法要写在主 goroutine 中或者。如果像上面一样使用在新的goroutine中,则需要自己写好阻塞主携程的方法。**


最后附上完整使用新goroutine监听关闭服务器的demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package main

import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
"time"
)

func gracefullShutdown(server *http.Server, quit <-chan os.Signal, done chan<- bool) {
<-quit
// 30s,让服务器做一些清理操作
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
server.SetKeepAlivesEnabled(false)
if err := server.Shutdown(ctx); err != nil {
fmt.Println("server.Shutdown err ..... ", err)
}
//do Something :
fmt.Println("do something start ..... ", time.Now())
time.Sleep(5 * time.Second)
fmt.Println("do something end ..... ", time.Now())
close(done)
}

func main() {
var (
done = make(chan bool, 1)
quit = make(chan os.Signal, 1)
)

router := http.NewServeMux()
port := fmt.Sprint(":36010")
srv := &http.Server{
Addr: port,
Handler: router,
}

// 给信号挖个坑,如果抓到这些信号就退出程序
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
go gracefullShutdown(srv, quit, done)

fmt.Println("[HTTP] http server listen", port)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
fmt.Println("ListenAndServe err ..... ", err)
os.Exit(1)
}
<-done
fmt.Println("[HTTP] Showdown end")
}