基于k8s上loggie/vector/openobserve日志收集

在上次的日志收集组件变化中简单的介绍了新方案,通常要么基于K8s收集容器的标准输出,要么收集文件。我们尝试使用最新的方式进行配置日志收集的组合进行测试,如下:

image-20230811224414090.png

但是,在开始之前,我们需要部署kafka,zookeeper和kowl

1.kafka

修改kafka的ip地址

version: "2"
services:
  zookeeper:
    container_name: zookeeper    
    image: uhub.service.ucloud.cn/marksugar-k8s/zookeeper:latest
    container_name: zookeeper
    restart: always
    ports:
      - '2182:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    logging:
      driver: "json-file"
      options:
        max-size: "100M"
    mem_limit: 2048m      
  kafka:
    hostname: 172.16.100.151
    image: uhub.service.ucloud.cn/marksugar-k8s/kafka:2.8.1
    container_name: kafka
    user: root
    restart: always
    ports:
      - '9092:9092'
    volumes:
      - "/data/log/kafka:/bitnami/kafka"  # chmod 777 -R /data/kafka
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_LISTENERS=PLAINTEXT://:9092
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.16.100.151:9092
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper
    logging:
      driver: "json-file"
      options:
        max-size: "100M"
    mem_limit: 2048m      
  kowl:
    container_name: kowl
    1. network_mode: host
    restart: always  
    1. image: quay.io/cloudhut/kowl:v1.5.0
    image: uhub.service.ucloud.cn/marksugar-k8s/kowl:v1.5.0
    restart: on-failure
    hostname: kowl
    ports:
    - "8081:8080"
    environment:
      KAFKA_BROKERS: 172.16.100.151:9092
    volumes:
    - /etc/localtime:/etc/localtime:ro  # 时区2
    depends_on:
      - kafka
    logging:
      driver: "json-file"
      options:
        max-size: "100M"
    mem_limit: 2048m

2.loggie

接着参考官网helm-chart下载,而后解压,配置loggie的用例

VERSION=v1.4.0
helm pull https://github.com/loggie-io/installation/releases/download/$VERSION/loggie-$VERSION.tgz && tar xvzf loggie-$VERSION.tgz

根据官网的配置示例进行修改,而后得到一个如下的latest.yaml,我们关键需要定义资源配额,加速后镜像地址,外部挂载容器的实际目录

image: uhub.service.ucloud.cn/marksugar-k8s/loggie:v1.4.0

resources:
  limits:
    cpu: 2
    memory: 2Gi
  requests:
    cpu: 100m
    memory: 100Mi

extraArgs: {}
  1. log.level: debug
  1. log.jsonFormat: true

extraVolumeMounts:
  - mountPath: /var/log/pods
    name: podlogs
  - mountPath: /var/lib/docker/containers
    name: dockercontainers
  - mountPath: /var/lib/kubelet/pods
    name: kubelet

extraVolumes:
  - hostPath:
      path: /var/log/pods
      type: DirectoryOrCreate
    name: podlogs
  - hostPath:
      1. path: /var/lib/docker/containers
      path: /data/containerd # containerd的实际目录
      type: DirectoryOrCreate
    name: dockercontainers
  - hostPath:
      path: /var/lib/kubelet/pods
      type: DirectoryOrCreate
    name: kubelet

extraEnvs: {}
timezone: Asia/Shanghai

## ref: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
nodeSelector: {}

## Affinity for pod assignment
## ref: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#affinity-and-anti-affinity
affinity: {}
1. podAntiAffinity:
1.   requiredDuringSchedulingIgnoredDuringExecution:
1.   - labelSelector:
1.       matchExpressions:
1.       - key: app
1.         operator: In
1.         values:
1.         - loggie
1.     topologyKey: "kubernetes.io/hostname"

## Tolerations for pod assignment
## ref: https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/
tolerations: []
1. - effect: NoExecute
1.   operator: Exists
1. - effect: NoSchedule
1.   operator: Exists

updateStrategy:
  type: RollingUpdate

## Agent mode, ignored when aggregator.enabled is true
config:
  loggie:
    reload:
      enabled: true
      period: 10s
    monitor:
      logger:
        period: 30s
        enabled: true
      listeners:
        filesource:
          period: 10s
        filewatcher:
          period: 5m
        reload:
          period: 10s
        sink:
          period: 10s
        queue:
          period: 10s
        pipeline:
          period: 10s

    discovery:
      enabled: true
      kubernetes:
        1. Choose: docker or containerd
        containerRuntime: containerd
        1. Collect log files inside the container from the root filesystem of the container, no need to mount the volume
        rootFsCollectionEnabled: false
        1. Automatically parse and convert the wrapped container standard output format into the original log content
        parseStdout: false
        1. If set to true, it means that the pipeline configuration generated does not contain specific Pod paths and meta information,
        1. and these data will be dynamically obtained by the file source, thereby reducing the number of configuration changes and reloads.
        dynamicContainerLog: false
        1. Automatically add fields when selector.type is pod in logconfig/clusterlogconfig
        typePodFields:
          logconfig: "${_k8s.logconfig}"
          namespace: "${_k8s.pod.namespace}"
          nodename: "${_k8s.node.name}"
          podname: "${_k8s.pod.name}"
          containername: "${_k8s.pod.container.name}"

    http:
      enabled: true
      port: 9196

## Aggregator mode, by default is disabled
aggregator:
  enabled: false
  replicas: 2
  config:
    loggie:
      reload:
        enabled: true
        period: 10s
      monitor:
        logger:
          period: 30s
          enabled: true
        listeners:
          reload:
            period: 10s
          sink:
            period: 10s
      discovery:
        enabled: true
        kubernetes:
          cluster: aggregator
      http:
        enabled: true
        port: 9196

servicePorts:
  - name: monitor
    port: 9196
    targetPort: 9196
1.  - name: gprc
1.    port: 6066
1.    targetPort: 6066

serviceMonitor:
  enabled: false
  ## Scrape interval. If not set, the Prometheus default scrape interval is used.
  interval: 30s
  relabelings: []
  metricRelabelings: []

而后调试并安装

helm install loggie -f latest.yaml -nloggie --create-namespace --dry-run ./
helm install loggie -f latest.yaml -nloggie --create-namespace ./

默认情况下会以ds的方式进行部署,也就是每个Node节点安装一个。

[root@master-01 ~/loggie-io]# kubectl -n loggie get pod
NAME           READY   STATUS    RESTARTS   AGE
loggie-42rcs   1/1     Running   0          15d
loggie-56sz8   1/1     Running   0          15d
loggie-jnzrc   1/1     Running   0          15d
loggie-k5xqj   1/1     Running   0          15d
loggie-v84wf   1/1     Running   0          14d

2.1 配置收集

在配置收集日志之前,我们先创建一个pod,加入此时有一组pod,他的标签是app: linuxea,在kustomize中表现如下:

commonLabels:
  app: linuxea

而后开始loggie的配置。

在loggie的配置可以大致理解为局部配置和全局配置,如果没有特别的要求,默认的全局配置是够用,倘若不够我们需要局部声明不同的配置信息。

1,此时创建一个sink上游是kafka,ip地址是172.16.100.151:9092,我们输入类型,地址,即将创建的topic的名称

apiVersion: loggie.io/v1beta1
kind: Sink
metadata:
  name: default-kafka
spec:  
  sink: |
    type: kafka
    brokers: ["172.16.100.151:9092"]
    topic: "pod-${fields.environment}-${fields.topic}"

but,如果这是一个加密的,你需要配置如下

apiVersion: loggie.io/v1beta1
kind: Sink
metadata:
  name: default-kafka
spec:  
  sink: |
    type: kafka
    brokers: ["172.16.100.151:9092"]
    topic: "pod-${fields.environment}-${fields.topic}"
    sasl:
      type: scram
      userName: 用户名
      password: 密码
      algorithm: sha256

2,而在LogConfig使用的是标签来关联那些pod的日志将会被收集到,如下

    labelSelector:
      app: linuxea  # 对应deployment的标签 

标记有app: linuxea标签的pod均被收集

3,而这些pod的日志的路径paths是pod中标准输出stdout,如果是文件目录这里应该填写对应的地址和正则匹配

4,接着配置一个fields来描述资源,key:value

        fields:
          topic: "java-demo"
          environment: "dev"

而这个自定义的描述被sink中的环境变量所提取,既:

topic: "pod-${fields.environment}-${fields.topic}"

5,在interceptors中我们进行了限流,这意味着每秒最多只能处理

        interceptors: |
          - type: rateLimit
            qps: 90000

6,最后使用sinkRef关联创建的sink: sinkRef: default-kafka

完整的yaml如下:

apiVersion: loggie.io/v1beta1
kind: Sink
metadata:
  name: default-kafka
spec:  
  sink: |
    type: kafka
    brokers: ["172.16.100.151:9092"]
    topic: "pod-${fields.environment}-${fields.topic}"
---
apiVersion: loggie.io/v1beta1
kind: LogConfig
metadata:
  name: java-demo
  namespace: linuxea-dev
spec:
  selector:
    type: pod
    labelSelector:
      app: linuxea  # 对应deployment的标签 
  pipeline:
    sources: |
      - type: file
        name: production-java-demo
        paths:
        - stdout
        ignoreOlder: 12h
        workerCount: 128
        fields:
          topic: "java-demo"
          environment: "dev"
        interceptors: |
          - type: rateLimit
            qps: 90000
          - type: transformer
            actions:
              - action: jsonDecode(body)
    sinkRef: default-kafka
    interceptorRef: default

创建完成

[root@master-01 ~/loggie-io]# kubectl -n loggie get sink
NAME            AGE
default-kafka   15d

[root@master-01 ~/loggie-io]# kubectl -n linuxea-dev get LogConfig 
NAME        POD SELECTOR        AGE
java-demo   {"app":"linuxea"}   15d

日志写入后,到kafka查看的日志格式如下:

{
"fields":{
"containername":"java-demo"
"environment":"dev"
"logconfig":"java-demo"
"namespace":"linuxea-dev"
"nodename":"172.16.100.83"
"podname":"production-java-demo-5cf5b97645-4xh89"
"topic":"java-demo"
}
"body":"2023-08-15T22:10:22.773955049+08:00 stdout F 2023-08-15 22:10:22.773 INFO 7 --- [ main] com.example.demo.DemoApplication : Started DemoApplication in 1.492 seconds (JVM running for ..."
}

image-20230815224624254.png

3.openobserve

我们需要安装openobserve,日志将会被消费到openobserve,安装openobserve在172.16.100.151的节点上

version: "2.2"
services:
  openobserve:
    container_name: openobserve
    restart: always
    image: public.ecr.aws/zinclabs/openobserve:latest
    ports:
      - "5080:5080"
    volumes:
    - /etc/localtime:/etc/localtime:ro  # 时区2
    - /data/openobserve:/data
    environment:
    - ZO_DATA_DIR=/data
    - ZO_ROOT_USER_EMAIL=root@example.com
    - ZO_ROOT_USER_PASSWORD=Complexpass#123
    logging:
      driver: "json-file"
      options:
        max-size: "100M"
    mem_limit: 4096m

接着我们就可以消费kafka后,将日志用vector写入到172.16.100.151上的openobserve了

4.vector

vector作为替代logstash的角色,在此处的作用是消费kafka中的数据此时,我们需要配置在github的vector的releases页面下载安装包,我直接下载的rpm

https://github.com/vectordotdev/vector/releases/download/v0.31.0/vector-0.31.0-1.x86_64.rpm

安装完之后,我们需要创建一个配置文件vector.toml。格式非常简单,如下:

mv /etc/vector/vector.toml /etc/vector/vector.toml-bak
cat > /etc/vector/vector.toml << EOF
[api]
enabled = true
address = "0.0.0.0:8686"

[sources.kafka151]
type = "kafka"
bootstrap_servers = "172.16.100.151:9092"
group_id = "consumer-group-name"
topics = [ "pod-dev-java-demo" ]
[sources.kafka151.decoding]
codec = "json"

[sinks.openobserve]
type = "http"
inputs = [ "kafka151" ]
uri = "http://172.16.100.151:5080/api/pod-dev-java-demo/default/_json"
method = "post"
auth.strategy = "basic"
auth.user = "root@example.com"
auth.password = "Complexpass#123"
compression = "gzip"
encoding.codec = "json"
encoding.timestamp_format = "rfc3339"
healthcheck.enabled = false
EOF

but,如果kafka加密了的话,我们需要添加额外的sasl配置

[sources.kafka151]
type = "kafka"
bootstrap_servers = "172.16.100.151:9092"
group_id = "consumer-group-name"
topics = [ "pod-dev-java-demo" ]
sasl.enabled = true
sasl.mechanism = "SCRAM-SHA-256"
sasl.password = "密码"
sasl.username = "用户名"
[sources.kafka151.decoding]
codec = "json"

对于日志的内容的处理,可以借助https://playground.vrl.dev/

将上述文件替换到/etc/vector/vector.toml而后,启动

systemctl start vector
systemctl enable vector

注意:uri = "http://172.16.100.151:5080/api/pod-dev-java-demo/default/_json",我们可以理解成http://172.16.100.151:5080/api/[group]/[items]/_json,如果在一个项目组的多个项目,我们可以通过这种方式进行归类

回到openobserve查看

image-20230815232619422.png

而后点击explore

image-20230815232643369.png

查看日志回到logs查看

image-20230815232825621.png

5.openobsever搜索

此时我的日志字段如下

{
"fields":{
"podname":"production-java-demo-5cf5b97645-9ws4w"
"topic":"java-demo"
"containername":"java-demo"
"environment":"dev"
"logconfig":"java-demo"
"namespace":"linuxea-dev"
"nodename":"172.16.100.83"
}
"body":"2023-08-15T23:19:33.032689346+08:00 stdout F 2023-08-15 23:19:33.032 INFO 7 --- [ main] com.example.demo.DemoApplication : Started DemoApplication in 1.469 seconds (JVM running for ..."
}

如果我想搜索的内容是body中包含DemoApplication的内容,语法如下

str_match(body, 'DemoApplication')

image-20230815233838453.png

默认情况下,只有msg,meesage,logs才会被全局匹配,对于不是这些字段的,我们需要使用str_match,如果匹配的字段是body的,包含DemoApplication的日志,可以使用如下命令

str_match(body, 'DemoApplication')

现在,一个可以替代传统ELK的日志方案就完成了。