本文从代码的大的整体组织上来熟悉containerd项目
containerd项目总的说是一个cs模式的原生控制台程序组。containerd作为服务端来接收处理client的各种请求,如常用的拉取推送镜像,创建查询停止容器,生成快照,发送消息等。client/server之间通过grpc和ttrpc框架进行交互。
我们可以先看一下contanerd源代码中的cmd下文件夹如图:
每一个目录都会生成一个二进制文件:
containerd 为服务主程序。
containerd-shim-runc-v2: 为主程序和runtime程序之间交互的垫片二进制
contianerd-stress:不是实际生产使用的程序,而是为了对containerd进行压力测试使用
ctr: 是containerd的客户端二进制,可以发送各种命令请求。上一个用法图:
我们看到了项目的最终输出的物理文件。那么具体的交互逻辑或者说流程是什么样的。其实每个具体的功能都是通过各个相应的插件来完成。containerd背后有各种标准如oci、cni、csi等,采用插件的形式方便了各个供应商扩展自己的功能。我们先从静态代码上梳理一下.在项目的core目录下包含了containerd实现的大模块,如容器、内容、差异、镜像、元数据存储、租约、指标、挂载点、镜像注册中心、快照、沙箱、运行时。
我们以content模块为例将进行探索。
在core/content/content.go中抽象出来了content的接口类型如:
type Store interface {
Manager
Provider
IngestManager
Ingester
}
这个是个接口聚合,每个都可以展开成一个具体接口或者接口组合。
如:
// InfoProvider provides info for content inspection.
type InfoProvider interface {
// Info will return metadata about content available in the content store.
//
// If the content is not present, ErrNotFound will be returned.
Info(ctx context.Context, dgst digest.Digest) (Info, error)
}
我们刚才说每个功能都是由插件实现(插件会在server启动时加载,先埋下伏笔)那么进入plugins/content/local/store.go,可以看到它实现了上面的InfoProvider接口
func (s *store) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) {
p, err := s.blobPath(dgst)
if err != nil {
return content.Info{}, fmt.Errorf("calculating blob info path: %w", err)
}
fi, err := os.Stat(p)
if err != nil {
if os.IsNotExist(err) {
err = fmt.Errorf("content %v: %w", dgst, errdefs.ErrNotFound)
}
return content.Info{}, err
}
var labels map[string]string
if s.ls != nil {
labels, err = s.ls.Get(dgst)
if err != nil {
return content.Info{}, err
}
}
return s.info(dgst, fi, labels), nil
}
Manager的接口也被实现了,这里不列出了。现在是实现有了。插件在哪里使用它呢,通过在鼠标右键->查找用法(IDEA+go插件环境)找到
cmd/containerd/server/server.go文件中的
func LoadPlugins(ctx context.Context, config *srvconfig.Config) ([]plugin.Registration, error) {
// load all plugins into containerd
// .............
// load additional plugins that don't automatically register themselves
registry.Register(&plugin.Registration{
Type: plugins.ContentPlugin,
ID: "content",
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
root := ic.Properties[plugins.PropertyRootDir]
ic.Meta.Exports["root"] = root
return local.NewStore(root)
},
})
//....................
}
第479行,return local.NewStore(root)
对store进行了实例化。
插件类型为plugins.ContentPlugin
,id为content
.到此完成了插件对接口实现的包装和注册。
plugins/content/local/store.go对store的实现可以在本地直接调用。没有涉及到客户端client发送请求调用。
客户端请求的插件同样可以在上述的loadplugins
函数中找到
clients := &proxyClients{}
for name, pp := range config.ProxyPlugins {
var (
t plugin.Type
f func(*grpc.ClientConn) interface{}
address = pp.Address
p v1.Platform
err error
)
switch pp.Type {
//........
case string(plugins.ContentPlugin), "content":
t = plugins.ContentPlugin
f = func(conn *grpc.ClientConn) interface{} {
return csproxy.NewContentStore(csapi.NewContentClient(conn))
}
//......
```
registry.Register(&plugin.Registration{
Type: t,
ID: name,
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
ic.Meta.Exports = exports
ic.Meta.Platforms = append(ic.Meta.Platforms, p)
conn, err := clients.getClient(address)
if err != nil {
return nil, err
}
return f(conn), nil
},
})
第1行声明了客户端proxyClients,第17行创建了生成store的函数体。在第29行对proxyClients实例化创建了到server的连接,并在第33行调用前面声明的函数体完成初始化的逻辑,同时在第23行也实现了对插件的注册。
在这个函数里还进行了snapshot
、sandbox
、diff
插件的注册。
如果再进一步看下第18行的代码发现它是调用core/content/proxy/content_store.go中的函数func NewContentStore(client contentapi.ContentClient) content.Store{...}
可以在plugins/services/content/service.go中找到具体的grpc plugin content
func init() {
registry.Register(&plugin.Registration{
Type: plugins.GRPCPlugin,
ID: "content",
Requires: []plugin.Type{
plugins.ServicePlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
cs, err := ic.GetByID(plugins.ServicePlugin, services.ContentService)
if err != nil {
return nil, err
}
return contentserver.New(cs.(content.Store)), nil
},
})
}
可以看到在第6行又依赖了plugins.ServicePlugin
插件类型。serviceplugin类型在plugins/services/content/store.go文件中可以找到
func init() {
registry.Register(&plugin.Registration{
Type: plugins.ServicePlugin,
ID: services.ContentService,
Requires: []plugin.Type{
plugins.EventPlugin,
plugins.MetadataPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
m, err := ic.GetSingle(plugins.MetadataPlugin)
if err != nil {
return nil, err
}
ep, err := ic.GetSingle(plugins.EventPlugin)
if err != nil {
return nil, err
}
s, err := newContentStore(m.(*metadata.DB).ContentStore(), ep.(events.Publisher))
return s, err
},
})
}
id为services.ContentService
的插件。并且调用插件返回content.store,并在第13行作为参数传入contentserver的new构造函数创建contentserver实例。
contentserver主要完成的接收grpc的请求然后调用store的实现。
如info功能的业务逻辑如下:
func (s *service) Info(ctx context.Context, req *api.InfoRequest) (*api.InfoResponse, error) {
dg, err := digest.Parse(req.Digest)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "%q failed validation", req.Digest)
}
bi, err := s.store.Info(ctx, dg)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return &api.InfoResponse{
Info: infoToGRPC(bi),
}, nil
}
由于使用了grpc的通讯框架,content的协议定义文件为api/services/content/v1/content.proto,里面定义了消息格式
message InfoRequest {
string digest = 1;
}
message InfoResponse {
Info info = 1;
}
和服务接口
service Content {
// Info returns information about a committed object.
//
// This call can be used for getting the size of content and checking for
// existence.
rpc Info(InfoRequest) returns (InfoResponse);
// ......
}
生成的go grpc实现的文件为:api/services/content/v1/content_grpc.pb.go 其中info功能的服务功能如下:
func _Content_Info_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(InfoRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ContentServer).Info(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/containerd.services.content.v1.Content/Info",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ContentServer).Info(ctx, req.(*InfoRequest))
}
return interceptor(ctx, in, info, handler)
}
在第7行、第14行调用了上述插件中的contentserver的info实现。
此handler也封装到了Content_ServiceDesc
结构中然后通过此文件中的
func RegisterContentServer(s grpc.ServiceRegistrar, srv ContentServer) {
s.RegisterService(&Content_ServiceDesc, srv)
}
函数封装了contentserver,此函数也被plugins/services/content/contentserver/contentserver.go中的
func (s *service) Register(server *grpc.Server) error {
api.RegisterContentServer(server, s)
return nil
}
调用,此函数在插件加载完成后又被server注册到本地缓存中具体见文章开始的cmd/containerd/server/server.go中的new函数代码段:
for _, p := range loaded {
id := p.URI()
log.G(ctx).WithFields(log.Fields{"id": id, "type": p.Type}).Info("loading plugin")
var mustSucceed int32
initContext := plugin.NewContext(
ctx,
initialized,
map[string]string{
plugins.PropertyRootDir: filepath.Join(config.Root, id),
plugins.PropertyStateDir: filepath.Join(config.State, id),
plugins.PropertyGRPCAddress: config.GRPC.Address,
plugins.PropertyTTRPCAddress: config.TTRPC.Address,
},
)
initContext.RegisterReadiness = func() func() {
atomic.StoreInt32(&mustSucceed, 1)
return s.RegisterReadiness()
}
// load the plugin specific configuration if it is provided
if p.Config != nil {
pc, err := config.Decode(ctx, id, p.Config)
if err != nil {
return nil, err
}
initContext.Config = pc
}
result := p.Init(initContext)
if err := initialized.Add(result); err != nil {
return nil, fmt.Errorf("could not add plugin result to plugin set: %w", err)
}
instance, err := result.Instance()
if err != nil {
if plugin.IsSkipPlugin(err) {
log.G(ctx).WithFields(log.Fields{"error": err, "id": id, "type": p.Type}).Info("skip loading plugin")
} else {
log.G(ctx).WithFields(log.Fields{"error": err, "id": id, "type": p.Type}).Warn("failed to load plugin")
}
if _, ok := required[id]; ok {
return nil, fmt.Errorf("load required plugin %s: %w", id, err)
}
// If readiness was registered during initialization, the plugin cannot fail
if atomic.LoadInt32(&mustSucceed) != 0 {
return nil, fmt.Errorf("plugin failed after registering readiness %s: %w", id, err)
}
continue
}
delete(required, id)
// check for grpc services that should be registered with the server
if src, ok := instance.(grpcService); ok {
grpcServices = append(grpcServices, src)
}
if src, ok := instance.(ttrpcService); ok {
ttrpcServices = append(ttrpcServices, src)
}
if service, ok := instance.(tcpService); ok {
tcpServices = append(tcpServices, service)
}
s.plugins = append(s.plugins, result)
}
if len(required) != 0 {
var missing []string
for id := range required {
missing = append(missing, id)
}
return nil, fmt.Errorf("required plugin %s not included", missing)
}
// register services after all plugins have been initialized
for _, service := range grpcServices {
if err := service.Register(grpcServer); err != nil {
return nil, err
}
}
第一行loaded表示所以加载后的插件,在29行初始化插件,第34行得到cotnent.store接口的实例,第54行把实例放到缓存grpcservices中,
最后在第75行中调用contentserver的register函数。
上面主要是grpc server端的服务逻辑。客户端的使用逻辑可以在
core/content/proxy/content_store.go文件中找到,看info函数代码:
func (pcs *proxyContentStore) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) {
resp, err := pcs.client.Info(ctx, &contentapi.InfoRequest{
Digest: dgst.String(),
})
if err != nil {
return content.Info{}, errdefs.FromGRPC(err)
}
return infoFromGRPC(resp.Info), nil
}
在第2行中调用grpc client代理的info方法向服务器发送请求。
项目中调用的地方不止一处。由下图可见在ctr客户端的cmd/ctr/commands/content/content.go文件中也有使用
// Nothing updated, do no clear
if len(paths) == 0 {
info, err = cs.Info(ctx, info.Digest)
} else {
info, err = cs.Update(ctx, info, paths...)
}
如第3行的info和第5行的update均是grpc通信示例。
至此从迷宫一样的代码中梳理出了一个骨架结构,是否可以学到一些设计思想呢?具体的细节功能不在展开很多,后面将就容器的创建流程在对代码进行梳理。不对的地方,请不吝批评指正!