agent-module-architecture
npx skills add https://github.com/tencentblueking/bk-ci --skill agent-module-architecture
Agent 安装分布
Skill 文档
Agent æå»ºæºæ¨¡åæ¶ææå
模åå®ä½: Agent æ¯ BK-CI çæå»ºæºæ ¸å¿ç»ä»¶ï¼ç± Go è¯è¨ç¼åï¼è´è´£ä¸å端æå¡éä¿¡ãæ¥æ¶æå»ºä»»å¡ãæèµ· Worker è¿ç¨æ§è¡æå»ºã
ä¸ãæ¨¡åæ¦è¿°
1.1 æ ¸å¿èè´£
| èè´£ | 说æ |
|---|---|
| è¿ç¨ç®¡ç | Daemon 宿¤ Agent è¿ç¨ï¼ç¡®ä¿æç»è¿è¡ |
| ä»»å¡è°åº¦ | ä» Dispatch æå¡æåæå»ºä»»å¡å¹¶æ§è¡ |
| Worker 管ç | æèµ· Workerï¼Kotlin JARï¼æ§è¡å®é æå»ºé»è¾ |
| å¿è·³ä¸æ¥ | 宿ååç«¯ä¸æ¥ Agent ç¶æåç¯å¢ä¿¡æ¯ |
| èªå¨å级 | æ£æµå¹¶èªå¨å级 AgentãWorkerãJDK |
| æ°æ®éé | éè¿ Telegraf ééæå»ºæºææ æ°æ® |
| Docker æå»º | æ¯æ Docker 容å¨åæå»ºï¼Linuxï¼ |
1.2 ä¸ Worker çå ³ç³»
âââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââ
â æå»ºæº (Build Machine) â
âââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââ¤
â âââââââââââ 宿¤ âââââââââââ â
â â Daemon â ââââââââââââ¶ â Agent â â
â â (Go) â â (Go) â â
â âââââââââââ ââââââ¬âââââ â
â â æèµ· â
â â¼ â
â âââââââââââ â
â â Worker â â
â â(Kotlin) â â
â ââââââ¬âââââ â
â â æ§è¡ â
â â¼ â
â ââââââââââââââââââââââââ â
â â æä»¶ä»»å¡ / èæ¬ä»»å¡ â â
â ââââââââââââââââââââââââ â
âââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââ
- Agent (Go): è´è´£è¿ç¨è°åº¦ãä¸å端éä¿¡ãç¯å¢ç®¡ç
- Worker (Kotlin): è´è´£å ·ä½æå»ºä»»å¡æ§è¡ãæä»¶è¿è¡ãæ¥å¿ä¸æ¥
äºãç®å½ç»æ
src/agent/
âââ agent/ # 主 Agent 模å
â âââ src/
â â âââ cmd/ # å
¥å£ç¨åº
â â â âââ agent/main.go # Agent 主ç¨åºå
¥å£
â â â âââ daemon/main.go # Daemon 宿¤è¿ç¨å
¥å£
â â â âââ installer/main.go # å®è£
ç¨åºå
¥å£
â â â âââ upgrader/main.go # å级ç¨åºå
¥å£
â â âââ pkg/ # æ ¸å¿å
â â â âââ agent/ # Agent æ ¸å¿é»è¾
â â â âââ api/ # API 客æ·ç«¯
â â â âââ collector/ # æ°æ®éé
â â â âââ config/ # é
置管ç
â â â âââ cron/ # 宿¶ä»»å¡
â â â âââ i18n/ # å½é
å
â â â âââ imagedebug/ # Docker éåè°è¯
â â â âââ job/ # æå»ºä»»å¡ç®¡ç
â â â âââ job_docker/ # Docker æå»º
â â â âââ pipeline/ # Pipeline ä»»å¡
â â â âââ upgrade/ # å级é»è¾
â â â âââ upgrader/ # å级å¨å®ç°
â â â âââ util/ # å·¥å
·å½æ°
â â âââ third_components/ # ç¬¬ä¸æ¹ç»ä»¶ç®¡ç
â âââ go.mod
â âââ Makefile
â âââ README.md
âââ agent-slim/ # è½»éç Agent
â âââ cmd/slim.go
âââ common/ # å
Œ
±å·¥å
ጼ
âââ utils/
âââ fileutil/
âââ slice.go
ä¸ãæ ¸å¿ç»ä»¶è¯¦è§£
3.1 Daemon 宿¤è¿ç¨
æä»¶: src/cmd/daemon/main.go
Daemon è´è´£å®æ¤ Agent è¿ç¨ï¼ç¡®ä¿å ¶æç»è¿è¡ï¼
// Unix å®ç°ï¼éè¿æä»¶éæ£æµ Agent æ¯å¦åæ´»
func watch(isDebug bool) {
totalLock := flock.New(fmt.Sprintf("%s/%s.lock", systemutil.GetRuntimeDir(), systemutil.TotalLock))
// 馿¬¡ç«å³æ£æ¥
totalLock.Lock()
doCheckAndLaunchAgent(isDebug)
totalLock.Unlock()
// 宿¶æ£æ¥ï¼5ç§é´éï¼
checkTimeTicker := time.NewTicker(agentCheckGap)
for ; ; totalLock.Unlock() {
select {
case <-checkTimeTicker.C:
if err := totalLock.Lock(); err != nil {
continue
}
doCheckAndLaunchAgent(isDebug)
}
}
}
// æ£æ¥å¹¶æèµ· Agent
func doCheckAndLaunchAgent(isDebug bool) {
agentLock := flock.New(fmt.Sprintf("%s/agent.lock", systemutil.GetRuntimeDir()))
locked, err := agentLock.TryLock()
if err == nil && locked {
// è½è·åé说æ Agent æªè¿è¡ï¼éè¦æèµ·
logs.Warn("agent is not available, will launch it")
process, err := launch(workDir+"/"+config.AgentFileClientLinux, isDebug)
if err != nil {
logs.WithError(err).Error("launch agent failed")
}
}
}
Windows å®ç°: ä½¿ç¨ github.com/kardianos/service åºå®ç° Windows Service
3.2 Agent æ ¸å¿æµç¨
æä»¶: src/pkg/agent/agent.go
func Run(isDebug bool) {
// 1. åå§åé
ç½®
config.Init(isDebug)
third_components.Init()
// 2. åå§åå½é
å
i18n.InitAgentI18n()
// 3. 䏿¥å¯å¨ï¼éè¯ç´å°æåï¼
_, err := job.AgentStartup()
if err != nil {
for {
_, err = job.AgentStartup()
if err == nil {
break
}
time.Sleep(5 * time.Second)
}
}
// 4. å¯å¨åå°ä»»å¡
go collector.Collect() // æ°æ®éé
go cron.CleanJob() // 宿æ¸
ç
go cron.CleanDebugContainer() // æ¸
çè°è¯å®¹å¨
// 5. 主循ç¯ï¼Ask 请æ±
for {
doAsk()
config.LoadAgentIp()
time.Sleep(5 * time.Second)
}
}
3.3 Ask ç»ä¸è¯·æ±æ¨¡å¼
Agent ä½¿ç¨ Ask 模å¼ç»ä¸å¤çå¤ç§ä»»å¡ï¼
func doAsk() {
// æå»º Ask 请æ±
enable := genAskEnable()
heart, upgrad := genHeartInfoAndUpgrade(enable.Upgrade, exiterror)
result, err := api.Ask(&api.AskInfo{
Enable: enable, // å¯ç¨çåè½
Heart: heart, // å¿è·³ä¿¡æ¯
Upgrade: upgrad, // å级信æ¯
})
// å¤çååº
resp := new(api.AskResp)
util.ParseJsonToData(result.Data, &resp)
// æ§è¡å类任å¡
doAgentJob(enable, resp)
}
func doAgentJob(enable api.AskEnable, resp *api.AskResp) {
// å¿è·³ååºå¤ç
if resp.Heart != nil {
go agentHeartbeat(resp.Heart)
}
// æå»ºä»»å¡
hasBuild := (enable.Build != api.NoneBuildType) && (resp.Build != nil)
if hasBuild {
go job.DoBuild(resp.Build)
}
// å级任å¡
if enable.Upgrade && resp.Upgrade != nil {
go upgrade.AgentUpgrade(resp.Upgrade, hasBuild)
}
// Pipeline ä»»å¡
if enable.Pipeline && resp.Pipeline != nil {
go pipeline.RunPipeline(resp.Pipeline)
}
// Docker è°è¯
if enable.DockerDebug && resp.Debug != nil {
go imagedebug.DoImageDebug(resp.Debug)
}
}
3.4 æå»ºä»»å¡æ§è¡
æä»¶: src/pkg/job/build.go
// DoBuild æ§è¡æå»ºä»»å¡
func DoBuild(buildInfo *api.ThirdPartyBuildInfo) {
// è·åä»»å¡é
BuildTotalManager.Lock.Lock()
// æ£æ¥å¹¶åæ°
dockerCanRun, normalCanRun := CheckParallelTaskCount()
if buildInfo.DockerBuildInfo != nil && dockerCanRun {
// Docker æå»º
GBuildDockerManager.AddBuild(buildInfo.BuildId, &api.ThirdPartyDockerTaskInfo{...})
BuildTotalManager.Lock.Unlock()
runDockerBuild(buildInfo)
return
}
if normalCanRun {
// æ®éæå»º
GBuildManager.AddPreInstance(buildInfo.BuildId)
BuildTotalManager.Lock.Unlock()
runBuild(buildInfo)
}
}
// runBuild å¯å¨ Worker è¿ç¨
func runBuild(buildInfo *api.ThirdPartyBuildInfo) error {
// æ£æ¥ worker.jar æ¯å¦åå¨
agentJarPath := config.BuildAgentJarPath()
if !fileutil.Exists(agentJarPath) {
// å°è¯èªæ
upgradeWorkerFile := systemutil.GetUpgradeDir() + "/" + config.WorkAgentFile
if fileutil.Exists(upgradeWorkerFile) {
fileutil.CopyFile(upgradeWorkerFile, agentJarPath, true)
}
}
// 设置ç¯å¢åé
goEnv := map[string]string{
"DEVOPS_AGENT_VERSION": config.AgentVersion,
"DEVOPS_WORKER_VERSION": third_components.Worker.GetVersion(),
"DEVOPS_PROJECT_ID": buildInfo.ProjectId,
"DEVOPS_BUILD_ID": buildInfo.BuildId,
"DEVOPS_VM_SEQ_ID": buildInfo.VmSeqId,
"DEVOPS_FILE_GATEWAY": config.GAgentConfig.FileGateway,
"DEVOPS_GATEWAY": config.GetGateWay(),
"BK_CI_LOCALE_LANGUAGE": config.GAgentConfig.Language,
"DEVOPS_AGENT_JDK_8_PATH": third_components.Jdk.Jdk8.GetJavaOrNull(),
"DEVOPS_AGENT_JDK_17_PATH": third_components.Jdk.Jdk17.GetJavaOrNull(),
}
// å建临æ¶ç®å½å¹¶å¯å¨æå»º
tmpDir, _ := systemutil.MkBuildTmpDir()
doBuild(buildInfo, tmpDir, workDir, goEnv, runUser)
}
3.5 é 置管ç
æä»¶: src/pkg/config/config.go
Agent é
ç½®ä» .agent.properties æä»¶å è½½ï¼
// é
ç½®é®å®ä¹
const (
KeyProjectId = "devops.project.id"
KeyAgentId = "devops.agent.id"
KeySecretKey = "devops.agent.secret.key"
KeyDevopsGateway = "landun.gateway"
KeyDevopsFileGateway = "landun.fileGateway"
KeyTaskCount = "devops.parallel.task.count"
KeyEnvType = "landun.env"
KeySlaveUser = "devops.slave.user"
KeyDockerTaskCount = "devops.docker.parallel.task.count"
KeyLanguage = "devops.language"
// ...
)
// AgentConfig é
ç½®ç»æ
type AgentConfig struct {
Gateway string
FileGateway string
BuildType string
ProjectId string
AgentId string
SecretKey string
ParallelTaskCount int
DockerParallelTaskCount int
EnableDockerBuild bool
Language string
// ...
}
// AgentEnv ç¯å¢ä¿¡æ¯
type AgentEnv struct {
OsName string
agentIp string
HostName string
AgentVersion string
AgentInstallPath string
OsVersion string
CPUProductInfo string
GPUProductInfo string
}
3.6 API 客æ·ç«¯
æä»¶: src/pkg/api/api.go
// æå»º URL
func buildUrl(url string) string {
return config.GetGateWay() + url
}
// Agent å¯å¨ä¸æ¥
func AgentStartup() (*httputil.DevopsResult, error) {
url := buildUrl("/ms/environment/api/buildAgent/agent/thirdPartyAgent/startup")
startInfo := &ThirdPartyAgentStartInfo{
HostName: config.GAgentEnv.HostName,
HostIp: config.GAgentEnv.GetAgentIp(),
DetectOs: config.GAgentEnv.OsName,
MasterVersion: config.AgentVersion,
SlaveVersion: third_components.Worker.GetVersion(),
}
return httputil.NewHttpClient().Post(url).Body(startInfo, false).
SetHeaders(config.GAgentConfig.GetAuthHeaderMap()).Execute(nil).IntoDevopsResult()
}
// æå»ºå®æä¸æ¥
func WorkerBuildFinish(buildInfo *ThirdPartyBuildWithStatus) (*httputil.DevopsResult, error) {
url := buildUrl("/ms/dispatch/api/buildAgent/agent/thirdPartyAgent/workerBuildFinish")
return httputil.NewHttpClient().Post(url).Body(buildInfo, false).
SetHeaders(config.GAgentConfig.GetAuthHeaderMap()).Execute(nil).IntoDevopsResult()
}
// Ask ç»ä¸è¯·æ±
func Ask(info *AskInfo) (*httputil.AgentResult, error) {
url := buildUrl("/ms/dispatch/api/buildAgent/agent/thirdPartyAgent/ask")
return httputil.NewHttpClient().Post(url).Body(info, bodyEq).
SetHeaders(config.GAgentConfig.GetAuthHeaderMap()).Execute(askRequest.Resp).IntoAgentResult()
}
3.7 å级æºå¶
æä»¶: src/pkg/upgrade/upgrade.go
// AgentUpgrade å级主é»è¾
func AgentUpgrade(upgradeItem *api.UpgradeItem, hasBuild bool) {
upItems := &upgradeItems{
Agent: upgradeItem.Agent,
Worker: upgradeItem.Worker,
Jdk: upgradeItem.Jdk,
DockerInitFile: upgradeItem.DockerInitFile,
}
if upItems.NoChange() {
return
}
// ææå»ºä»»å¡æ¶è·³è¿å级
if hasBuild {
return
}
// è·åä»»å¡éï¼ç¡®ä¿æ ä»»å¡è¿è¡
if !job.BuildTotalManager.Lock.TryLock() {
return
}
defer job.BuildTotalManager.Lock.Unlock()
if job.CheckRunningJob() {
return
}
// ä¸è½½å级æä»¶
downloadUpgradeFiles(upItems)
// æ§è¡å级
DoUpgradeOperation(upItems)
}
3.8 æ°æ®éé
æä»¶: src/pkg/collector/collector.go
ä½¿ç¨ Telegraf è¿è¡æ°æ®ééï¼
func Collect() {
if config.GAgentConfig.CollectorOn == false {
logs.Info("agent collector off")
return
}
for {
ctx, cancel := context.WithCancel(context.Background())
go func() {
// çå¬ IP ååäºä»¶
ipData := <-ipChan.DChan
cancel()
}()
doAgentCollect(ctx)
}
}
func doAgentCollect(ctx context.Context) {
// çæ Telegraf é
ç½®
configContent, _ := genTelegrafConfig()
// åå§å Telegraf Agent
tAgent, _ := getTelegrafAgent(configContent.Bytes(), logFile)
// è¿è¡éé
for {
tAgent.Run(ctx)
time.Sleep(telegrafRelaunchTime)
}
}
åãæ°æ®ç±»åå®ä¹
4.1 æå»ºä¿¡æ¯
æä»¶: src/pkg/api/type.go
// æå»ºä»»å¡ç±»å
type BuildJobType string
const (
AllBuildType BuildJobType = "ALL"
DockerBuildType BuildJobType = "DOCKER"
BinaryBuildType BuildJobType = "BINARY"
NoneBuildType BuildJobType = "NONE"
)
// ç¬¬ä¸æ¹æå»ºä¿¡æ¯
type ThirdPartyBuildInfo struct {
ProjectId string `json:"projectId"`
BuildId string `json:"buildId"`
VmSeqId string `json:"vmSeqId"`
Workspace string `json:"workspace"`
PipelineId string `json:"pipelineId"`
DockerBuildInfo *ThirdPartyDockerBuildInfo `json:"dockerBuildInfo"`
ExecuteCount *int `json:"executeCount"`
ContainerHashId string `json:"containerHashId"`
}
// Docker æå»ºä¿¡æ¯
type ThirdPartyDockerBuildInfo struct {
AgentId string `json:"agentId"`
SecretKey string `json:"secretKey"`
Image string `json:"image"`
Credential Credential `json:"credential"`
Options DockerOptions `json:"options"`
ImagePullPolicy string `json:"imagePullPolicy"`
}
4.2 å¿è·³ä¿¡æ¯
// Agent å¿è·³ä¿¡æ¯
type AgentHeartbeatInfo struct {
MasterVersion string `json:"masterVersion"`
SlaveVersion string `json:"slaveVersion"`
HostName string `json:"hostName"`
AgentIp string `json:"agentIp"`
ParallelTaskCount int `json:"parallelTaskCount"`
AgentInstallPath string `json:"agentInstallPath"`
StartedUser string `json:"startedUser"`
TaskList []ThirdPartyTaskInfo `json:"taskList"`
DockerParallelTaskCount int `json:"dockerParallelTaskCount"`
DockerTaskList []ThirdPartyDockerTaskInfo `json:"dockerTaskList"`
}
// å¿è·³ååº
type AgentHeartbeatResponse struct {
MasterVersion string `json:"masterVersion"`
SlaveVersion string `json:"slaveVersion"`
AgentStatus string `json:"agentStatus"`
ParallelTaskCount int `json:"parallelTaskCount"`
Envs map[string]string `json:"envs"`
Gateway string `json:"gateway"`
FileGateway string `json:"fileGateway"`
DockerParallelTaskCount int `json:"dockerParallelTaskCount"`
Language string `json:"language"`
}
äºãè·¨å¹³å°æ¯æ
5.1 å¹³å°ç¹å®ä»£ç
Agent éè¿ Go çæå»ºæ ç¾æ¯æå¤å¹³å°ï¼
src/pkg/config/
âââ config.go # éç¨é
ç½®
âââ config_darwin.go # macOS ç¹å®
âââ config_linux.go # Linux ç¹å®
âââ config_win.go # Windows ç¹å®
src/pkg/upgrader/
âââ upgrader_darwin.go # macOS å级å¨
âââ upgrader_unix.go # Unix å级å¨
âââ upgrader_win.go # Windows å级å¨
5.2 æå»ºå½ä»¤
# Linux
make clean build_linux
# macOS
make clean build_macos
# Windows
build_windows.bat
çæçäºè¿å¶æä»¶ï¼
devopsDaemon_linux/devopsDaemon_macos/devopsDaemon.exedevopsAgent_linux/devopsAgent_macos/devopsAgent.exeupgrader_linux/upgrader_macos/upgrader.exe
å ãä¸å端æå¡äº¤äº
6.1 API 端ç¹
| æå¡ | ç«¯ç¹ | ç¨é |
|---|---|---|
| Environment | /ms/environment/api/buildAgent/agent/thirdPartyAgent/startup |
Agent å¯å¨ä¸æ¥ |
| Dispatch | /ms/dispatch/api/buildAgent/agent/thirdPartyAgent/ask |
ç»ä¸ Ask è¯·æ± |
| Dispatch | /ms/dispatch/api/buildAgent/agent/thirdPartyAgent/workerBuildFinish |
æå»ºå®æä¸æ¥ |
| Environment | /ms/environment/api/buildAgent/agent/thirdPartyAgent/agents/pipelines |
Pipeline ä»»å¡ |
| Environment | /ms/environment/api/buildAgent/agent/thirdPartyAgent/upgrade/files/download |
ä¸è½½å级æä»¶ |
6.2 认è¯å¤´
func (a *AgentConfig) GetAuthHeaderMap() map[string]string {
return map[string]string{
"X-DEVOPS-BUILD-TYPE": a.BuildType,
"X-DEVOPS-PROJECT-ID": a.ProjectId,
"X-DEVOPS-AGENT-ID": a.AgentId,
"X-DEVOPS-AGENT-SECRET-KEY": a.SecretKey,
}
}
ä¸ãå¼åè§è
7.1 é误å¤ç
// æ åéè¯¯æ£æ¥
if err != nil {
logs.WithError(err).Error("operation failed")
return errors.Wrap(err, "context message")
}
// Panic æ¢å¤
defer func() {
if err := recover(); err != nil {
logs.Error("panic: ", err)
}
}()
7.2 æ¥å¿è§è
// æ¥å¿çº§å«
logs.Debug("debug message")
logs.Info("info message")
logs.Infof("formatted: %s", value)
logs.Warn("warning message")
logs.Error("error message")
logs.WithError(err).Error("error with context")
7.3 并忍¡å¼
// å¯å¨ goroutine
go collector.Collect()
go cron.CleanJob()
// 使ç¨éä¿æ¤å
±äº«èµæº
BuildTotalManager.Lock.Lock()
defer BuildTotalManager.Lock.Unlock()
// ä½¿ç¨æä»¶éè¿è¡è¿ç¨é´åæ¥
agentLock := flock.New(fmt.Sprintf("%s/agent.lock", runtimeDir))
locked, err := agentLock.TryLock()
7.4 æ°å¢åè½å¼å
- æ°å¢ API è°ç¨ï¼å¨
src/pkg/api/api.goæ·»å 彿° - æ°å¢æ°æ®ç±»åï¼å¨
src/pkg/api/type.goå®ä¹ç»æä½ - æ°å¢é
置项ï¼å¨
src/pkg/config/config.goæ·»å 常éååæ®µ - æ°å¢åå°ä»»å¡ï¼å¨
doAgentJob()䏿·»å å¤çé»è¾
å «ãæ§å¶èæ¬
# Linux 示ä¾
scripts/linux/install.sh # å®è£
scripts/linux/start.sh # å¯å¨
scripts/linux/stop.sh # 忢
scripts/linux/uninstall.sh # å¸è½½
ä¹ãç¸å ³æ¨¡å
| 模å | å ³ç³» | 说æ |
|---|---|---|
| Worker | 䏿¸¸ | Agent æèµ· Worker æ§è¡æå»º |
| Environment | 䏿¸¸ | Agent ç¶æç®¡çãå¿è·³ä¸æ¥ |
| Dispatch | 䏿¸¸ | æå»ºä»»å¡åå |
| Log | 䏿¸¸ | æå»ºæ¥å¿ä¸æ¥ï¼éè¿ Workerï¼ |