在上次的日志收集组件变化中简单的介绍了新方案,通常要么基于K8s收集容器的标准输出,要么收集文件。我们尝试使用最新的方式进行配置日志收集的组合进行测试,如下:
但是,在开始之前,我们需要部署kafka,zookeeper和kowl
1.kafka
修改kafka的ip地址
version: "2"
services:
zookeeper:
container_name: zookeeper
image: uhub.service.ucloud.cn/marksugar-k8s/zookeeper:latest
container_name: zookeeper
restart: always
ports:
- '2182:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
logging:
driver: "json-file"
options:
max-size: "100M"
mem_limit: 2048m
kafka:
hostname: 172.16.100.151
image: uhub.service.ucloud.cn/marksugar-k8s/kafka:2.8.1
container_name: kafka
user: root
restart: always
ports:
- '9092:9092'
volumes:
- "/data/log/kafka:/bitnami/kafka" # chmod 777 -R /data/kafka
environment:
- KAFKA_BROKER_ID=1
- KAFKA_LISTENERS=PLAINTEXT://:9092
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.16.100.151:9092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
logging:
driver: "json-file"
options:
max-size: "100M"
mem_limit: 2048m
kowl:
container_name: kowl
# network_mode: host
restart: always
# image: quay.io/cloudhut/kowl:v1.5.0
image: uhub.service.ucloud.cn/marksugar-k8s/kowl:v1.5.0
restart: on-failure
hostname: kowl
ports:
- "8081:8080"
environment:
KAFKA_BROKERS: 172.16.100.151:9092
volumes:
- /etc/localtime:/etc/localtime:ro # 时区2
depends_on:
- kafka
logging:
driver: "json-file"
options:
max-size: "100M"
mem_limit: 2048m
2.loggie
接着参考官网helm-chart下载,而后解压,配置loggie的用例
VERSION=v1.4.0
helm pull https://github.com/loggie-io/installation/releases/download/$VERSION/loggie-$VERSION.tgz && tar xvzf loggie-$VERSION.tgz
根据官网的配置示例进行修改,而后得到一个如下的latest.yaml,我们关键需要定义资源配额,加速后镜像地址,外部挂载容器的实际目录
image: uhub.service.ucloud.cn/marksugar-k8s/loggie:v1.4.0
resources:
limits:
cpu: 2
memory: 2Gi
requests:
cpu: 100m
memory: 100Mi
extraArgs: {}
# log.level: debug
# log.jsonFormat: true
extraVolumeMounts:
- mountPath: /var/log/pods
name: podlogs
- mountPath: /var/lib/docker/containers
name: dockercontainers
- mountPath: /var/lib/kubelet/pods
name: kubelet
extraVolumes:
- hostPath:
path: /var/log/pods
type: DirectoryOrCreate
name: podlogs
- hostPath:
# path: /var/lib/docker/containers
path: /data/containerd # containerd的实际目录
type: DirectoryOrCreate
name: dockercontainers
- hostPath:
path: /var/lib/kubelet/pods
type: DirectoryOrCreate
name: kubelet
extraEnvs: {}
timezone: Asia/Shanghai
## ref: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
nodeSelector: {}
## Affinity for pod assignment
## ref: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#affinity-and-anti-affinity
affinity: {}
# podAntiAffinity:
# requiredDuringSchedulingIgnoredDuringExecution:
# - labelSelector:
# matchExpressions:
# - key: app
# operator: In
# values:
# - loggie
# topologyKey: "kubernetes.io/hostname"
## Tolerations for pod assignment
## ref: https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/
tolerations: []
# - effect: NoExecute
# operator: Exists
# - effect: NoSchedule
# operator: Exists
updateStrategy:
type: RollingUpdate
## Agent mode, ignored when aggregator.enabled is true
config:
loggie:
reload:
enabled: true
period: 10s
monitor:
logger:
period: 30s
enabled: true
listeners:
filesource:
period: 10s
filewatcher:
period: 5m
reload:
period: 10s
sink:
period: 10s
queue:
period: 10s
pipeline:
period: 10s
discovery:
enabled: true
kubernetes:
# Choose: docker or containerd
containerRuntime: containerd
# Collect log files inside the container from the root filesystem of the container, no need to mount the volume
rootFsCollectionEnabled: false
# Automatically parse and convert the wrapped container standard output format into the original log content
parseStdout: false
# If set to true, it means that the pipeline configuration generated does not contain specific Pod paths and meta information,
# and these data will be dynamically obtained by the file source, thereby reducing the number of configuration changes and reloads.
dynamicContainerLog: false
# Automatically add fields when selector.type is pod in logconfig/clusterlogconfig
typePodFields:
logconfig: "${_k8s.logconfig}"
namespace: "${_k8s.pod.namespace}"
nodename: "${_k8s.node.name}"
podname: "${_k8s.pod.name}"
containername: "${_k8s.pod.container.name}"
http:
enabled: true
port: 9196
## Aggregator mode, by default is disabled
aggregator:
enabled: false
replicas: 2
config:
loggie:
reload:
enabled: true
period: 10s
monitor:
logger:
period: 30s
enabled: true
listeners:
reload:
period: 10s
sink:
period: 10s
discovery:
enabled: true
kubernetes:
cluster: aggregator
http:
enabled: true
port: 9196
servicePorts:
- name: monitor
port: 9196
targetPort: 9196
# - name: gprc
# port: 6066
# targetPort: 6066
serviceMonitor:
enabled: false
## Scrape interval. If not set, the Prometheus default scrape interval is used.
interval: 30s
relabelings: []
metricRelabelings: []
而后调试并安装
helm install loggie -f latest.yaml -nloggie --create-namespace --dry-run ./
helm install loggie -f latest.yaml -nloggie --create-namespace ./
默认情况下会以ds的方式进行部署,也就是每个Node节点安装一个。
[root@master-01 ~/loggie-io]# kubectl -n loggie get pod
NAME READY STATUS RESTARTS AGE
loggie-42rcs 1/1 Running 0 15d
loggie-56sz8 1/1 Running 0 15d
loggie-jnzrc 1/1 Running 0 15d
loggie-k5xqj 1/1 Running 0 15d
loggie-v84wf 1/1 Running 0 14d
2.1 配置收集
在配置收集日志之前,我们先创建一个pod,加入此时有一组pod,他的标签是app: linuxea,在kustomize中表现如下:
commonLabels:
app: linuxea
而后开始loggie的配置。
在loggie的配置可以大致理解为局部配置和全局配置,如果没有特别的要求,默认的全局配置是够用,倘若不够我们需要局部声明不同的配置信息。
1,此时创建一个sink上游是kafka,ip地址是172.16.100.151:9092,我们输入类型,地址,即将创建的topic的名称
apiVersion: loggie.io/v1beta1
kind: Sink
metadata:
name: default-kafka
spec:
sink: |
type: kafka
brokers: ["172.16.100.151:9092"]
topic: "pod-${fields.environment}-${fields.topic}"
but,如果这是一个加密的,你需要配置如下
apiVersion: loggie.io/v1beta1
kind: Sink
metadata:
name: default-kafka
spec:
sink: |
type: kafka
brokers: ["172.16.100.151:9092"]
topic: "pod-${fields.environment}-${fields.topic}"
sasl:
type: scram
userName: 用户名
password: 密码
algorithm: sha256
2,而在LogConfig使用的是标签来关联那些pod的日志将会被收集到,如下
labelSelector:
app: linuxea # 对应deployment的标签
标记有app: linuxea标签的pod均被收集
3,而这些pod的日志的路径paths是pod中标准输出stdout,如果是文件目录这里应该填写对应的地址和正则匹配
4,接着配置一个fields来描述资源,key:value
fields:
topic: "java-demo"
environment: "dev"
而这个自定义的描述被sink中的环境变量所提取,既:
topic: "pod-${fields.environment}-${fields.topic}"
5,在interceptors中我们进行了限流,这意味着每秒最多只能处理
interceptors: |
- type: rateLimit
qps: 90000
6,最后使用sinkRef关联创建的sink: sinkRef: default-kafka
完整的yaml如下:
apiVersion: loggie.io/v1beta1
kind: Sink
metadata:
name: default-kafka
spec:
sink: |
type: kafka
brokers: ["172.16.100.151:9092"]
topic: "pod-${fields.environment}-${fields.topic}"
---
apiVersion: loggie.io/v1beta1
kind: LogConfig
metadata:
name: java-demo
namespace: linuxea-dev
spec:
selector:
type: pod
labelSelector:
app: linuxea # 对应deployment的标签
pipeline:
sources: |
- type: file
name: production-java-demo
paths:
- stdout
ignoreOlder: 12h
workerCount: 128
fields:
topic: "java-demo"
environment: "dev"
interceptors: |
- type: rateLimit
qps: 90000
- type: transformer
actions:
- action: jsonDecode(body)
sinkRef: default-kafka
interceptorRef: default
创建完成
[root@master-01 ~/loggie-io]# kubectl -n loggie get sink
NAME AGE
default-kafka 15d
[root@master-01 ~/loggie-io]# kubectl -n linuxea-dev get LogConfig
NAME POD SELECTOR AGE
java-demo {"app":"linuxea"} 15d
日志写入后,到kafka查看的日志格式如下:
{
"fields":{
"containername":"java-demo"
"environment":"dev"
"logconfig":"java-demo"
"namespace":"linuxea-dev"
"nodename":"172.16.100.83"
"podname":"production-java-demo-5cf5b97645-4xh89"
"topic":"java-demo"
}
"body":"2023-08-15T22:10:22.773955049+08:00 stdout F 2023-08-15 22:10:22.773 INFO 7 --- [ main] com.example.demo.DemoApplication : Started DemoApplication in 1.492 seconds (JVM running for ..."
}
3.openobserve
我们需要安装openobserve,日志将会被消费到openobserve,安装openobserve在172.16.100.151的节点上
version: "2.2"
services:
openobserve:
container_name: openobserve
restart: always
image: public.ecr.aws/zinclabs/openobserve:latest
ports:
- "5080:5080"
volumes:
- /etc/localtime:/etc/localtime:ro # 时区2
- /data/openobserve:/data
environment:
- ZO_DATA_DIR=/data
- ZO_ROOT_USER_EMAIL=root@example.com
- ZO_ROOT_USER_PASSWORD=Complexpass#123
logging:
driver: "json-file"
options:
max-size: "100M"
mem_limit: 4096m
接着我们就可以消费kafka后,将日志用vector写入到172.16.100.151上的openobserve了
4.vector
vector作为替代logstash的角色,在此处的作用是消费kafka中的数据此时,我们需要配置在github的vector的releases页面下载安装包,我直接下载的rpm
https://github.com/vectordotdev/vector/releases/download/v0.31.0/vector-0.31.0-1.x86_64.rpm
安装完之后,我们需要创建一个配置文件vector.toml。格式非常简单,如下:
mv /etc/vector/vector.toml /etc/vector/vector.toml-bak
cat > /etc/vector/vector.toml << EOF
[api]
enabled = true
address = "0.0.0.0:8686"
[sources.kafka151]
type = "kafka"
bootstrap_servers = "172.16.100.151:9092"
group_id = "consumer-group-name"
topics = [ "pod-dev-java-demo" ]
[sources.kafka151.decoding]
codec = "json"
[sinks.openobserve]
type = "http"
inputs = [ "kafka151" ]
uri = "http://172.16.100.151:5080/api/pod-dev-java-demo/default/_json"
method = "post"
auth.strategy = "basic"
auth.user = "root@example.com"
auth.password = "Complexpass#123"
compression = "gzip"
encoding.codec = "json"
encoding.timestamp_format = "rfc3339"
healthcheck.enabled = false
EOF
but,如果kafka加密了的话,我们需要添加额外的sasl配置
[sources.kafka151]
type = "kafka"
bootstrap_servers = "172.16.100.151:9092"
group_id = "consumer-group-name"
topics = [ "pod-dev-java-demo" ]
sasl.enabled = true
sasl.mechanism = "SCRAM-SHA-256"
sasl.password = "密码"
sasl.username = "用户名"
[sources.kafka151.decoding]
codec = "json"
对于日志的内容的处理,可以借助https://playground.vrl.dev/
将上述文件替换到/etc/vector/vector.toml而后,启动
systemctl start vector
systemctl enable vector
注意:uri = "http://172.16.100.151:5080/api/pod-dev-java-demo/default/_json"
,我们可以理解成http://172.16.100.151:5080/api/[group]/[items]/_json
,如果在一个项目组的多个项目,我们可以通过这种方式进行归类
回到openobserve查看
而后点击explore
查看日志回到logs查看
5.openobsever搜索
此时我的日志字段如下
{
"fields":{
"podname":"production-java-demo-5cf5b97645-9ws4w"
"topic":"java-demo"
"containername":"java-demo"
"environment":"dev"
"logconfig":"java-demo"
"namespace":"linuxea-dev"
"nodename":"172.16.100.83"
}
"body":"2023-08-15T23:19:33.032689346+08:00 stdout F 2023-08-15 23:19:33.032 INFO 7 --- [ main] com.example.demo.DemoApplication : Started DemoApplication in 1.469 seconds (JVM running for ..."
}
如果我想搜索的内容是body中包含DemoApplication的内容,语法如下
str_match(body, 'DemoApplication')
默认情况下,只有msg,meesage,logs才会被全局匹配,对于不是这些字段的,我们需要使用str_match,如果匹配的字段是body的,包含DemoApplication的日志,可以使用如下命令
str_match(body, 'DemoApplication')
现在,一个可以替代传统ELK的日志方案就完成了。