查看原文
其他

博文推荐|Apache Pulsar 基于 Log4j2+Kafka+ELK 实现日志的快速检索

xs ApachePulsar 2021-10-18

本文转载自公众号 StreamCloudNative,作者薛松,就职于新大陆软件,担任高级软件工程师。 

编辑:鸡排,StreamNative。

关于 Apache Pulsar

Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性。 


当前已有众多国内外大型互联网和传统行业公司采用 Apache Pulsar,案例分布在人工智能、金融、电信运营商、直播与短视频、物联网、零售与电子商务、在线教育等多个行业,如美国有线电视网络巨头 Comcast、Yahoo!、腾讯、中国电信、中国移动、BIGO、VIPKID 等。 

背景介绍

Apache Pulsar 作为一个云原生的分布式消息系统,包括 Zookeeper、bookie、broker、functions-worker、proxy 等多个组件,并且所有的组件以分布式的方式部署在多台主机上,因此每个组件的日志文件也就分散在多台主机上,当组件出现问题时,由于日志比较分散,想要检查各个服务是否有报错信息,需要挨个服务去排查,比较麻烦,通常我们的做法是直接对日志文件进行 grep、awk 等命令就可以获得想要的信息。但是,随着应用和服务体量的增加,支撑的节点也随之增加,那么传统方法暴露出很多问题,比如:效率低下、日志量太大如何归档、文本搜索太慢怎么办、如何多维度查询等等。所以我们希望通过对日志进行聚合、监控,能够快速的找到 Pulsar 各个服务的报错信息,快速的排查,使得运维更加具有目的性、针对性和直接性。

为了解决日志检索的问题,我们团队考虑使用集中式日志收集系统,将 Pulsar 所有节点上的日志统一收集,管理,访问。

一个完整的集中式日志系统,需要包含以下几个主要特点:

•收集-能够采集多种来源的日志数据;•传输-能够稳定的把日志数据传输到中央系统;•存储-如何存储日志数据;•分析-可以支持 UI 分析;•警告-能够提供错误报告,监控机制.

ELK 提供了一整套解决方案,并且都是开源软件,之间互相配合使用,完美衔接,高效的满足了很多场合的应用,是目前主流的一种日志系统。我们公司拥有自研的大数据管理平台,通过大数据管理平台部署和管理 ELK,并且在生产系统中已经使用ELK 为多个业务系统提供了支撑服务。 ELK 是三个开源软件的缩写,分别表示:Elasticsearch、Logstash、Kibana , 它们都是开源软件,最新版本已经改名为 Elastic Stack,并新增了 Beats项目,其中包括 FileBeat,它是一个轻量级的日志收集处理工具 (Agent),Filebeat 占用资源少,适合于在各个服务器上搜集日志后传输给 Logstash。

在上图中可以看到,如果 Pulsar 使用这种日志采集模式存在两个问题:

•部署了 Pulsar 服务的主机必须部署一套 Filebeat 服务;•Pulsar 服务的日志必须以文件的方式落一次磁盘,占用了主机磁盘的 IO。

为此,我们考虑 Apache Pulsar 基于 Log4j2+Kafka+ELK 实现日志的快速检索,Log4j2 默认支持将日志发送到 Kafka 的功能,使用 Kafka 自带的 Log4j2Appender,在 Log4j2 配置文件中进行相应的配置,即可完成将 Log4j2 产生的日志实时发送至 Kafka 中。

如下图所示:

实施过程

下面以 Pulsar 2.6.2 版本为例,介绍 Apache Pulsar 基于 Log4j2+Kafka+ELK 实现日志的快速检索的解决方案的详细的实施过程。 

一、准备工作

首先需要确定的是在 Kibana 中用于检索日志的字段,可以对这些字段聚合、多维度查询,然后,Elasticsearch 根据检索字段进行分词,并创建索引。

如上图所示:我们将对 Pulsar 的日志建立了 8 个检索字段,分别是:集群名、主机名、主机IP、组件名、日志内容、系统时间、日志级别、集群实例。

二、实施过程

说明:为了保证 Pulsar 原生的配置文件和脚本文件的结构不被破坏,我们通过添加新的配置文件和脚本文件来实现此方案。

1. 添加配置文件

在 {PULSAR_HOME}/conf 目录中添加以下两个配置文件:

1)logenv.sh 
该文件是将Pulsar 组件启动时需要的 JVM 选项以配置的方式传递到 Pulsar 服务的 Java 进程中,内容示例如下:

KAFKA_CLUSTER=192.168.0.1:9092,192.168.0.2:9092,192.168.0.2:9092PULSAR_CLUSTER=pulsar_clusterPULSAR_TOPIC=pulsar_topicHOST_IP=192.168.0.1PULSAR_MODULE_INSTANCE_ID=1

以上这些字段的意义分别是:

•KAFKA_CLUSTER:Kafka broker list 地址;•PULSAR_CLUSTER:Pulsar 的集群名称;•PULSAR_TOPIC:Kafka 中用于接入 Pulsar 服务日志的 Topic;•HOST_IP:Pulsar 主机的 IP;•PULSAR_MODULE_INSTANCE_ID:Pulsar 服务的实例标识,一个主机上可能会部署多个 Pulsar 集群,集群间通过实例标识来区分。

2)log4j2-kafka.yaml 
该配置文件是从 log4j2.yaml 复制而来,在 log4j2.yaml 的基础上添加以下修改: (说明:下图中左侧为 log4j2.yaml,右侧为 log4j2-kafka.yaml。)

•添加 Kafka 集群 broker list,并定义 log4j2 写到 Kafka 中的消息记录格式,一条消息中的 8 个检索字段以空格分割,Elasticsearch 以空格作为分割符对 8 个检索字段进行分词。•添加 kafka Appenders;•添加 Failover;•修改 Loggers 的 Root 和 Logger,改为异步模式;•log4j2-kafka.yaml 配置文件的完整内容如下:

## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.#



Configuration: status: INFO monitorInterval: 30 name: pulsar packages: io.prometheus.client.log4j2

Properties: Property: - name: "pulsar.log.dir" value: "logs" - name: "pulsar.log.file" value: "pulsar.log" - name: "pulsar.log.appender" value: "RoutingAppender" - name: "pulsar.log.root.level" value: "info" - name: "pulsar.log.level" value: "info" - name: "pulsar.routing.appender.default" value: "Console" - name: "kafkaBrokers" value: "${sys:kafka.cluster}" - name: "pattern" value: "${sys:pulsar.cluster} ${sys:pulsar.hostname} ${sys:pulsar.hostip} ${sys:pulsar.module.type} ${sys:pulsar.module.instanceid} %date{yyyy-MM-dd HH:mm:ss.SSS} [%thread] [%c{10}] %level , %msg%n"

# Example: logger-filter script Scripts: ScriptFile: name: filter.js language: JavaScript path: ./conf/log4j2-scripts/filter.js charset: UTF-8

Appenders:

#Kafka Kafka: name: "pulsar_kafka" topic: "${sys:pulsar.topic}" ignoreExceptions: "false" PatternLayout: pattern: "${pattern}" Property: - name: "bootstrap.servers" value: "${kafkaBrokers}" - name: "max.block.ms" value: "2000"

# Console Console: name: Console target: SYSTEM_OUT PatternLayout: Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"

Failover: name: "Failover" primary: "pulsar_kafka" retryIntervalSeconds: "600" Failovers: AppenderRef: ref: "RollingFile"

# Rolling file appender configuration RollingFile: name: RollingFile fileName: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}" filePattern: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}-%d{MM-dd-yyyy}-%i.log.gz" immediateFlush: false PatternLayout: Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" Policies: TimeBasedTriggeringPolicy: interval: 1 modulate: true SizeBasedTriggeringPolicy: size: 1 GB # Delete file older than 30days DefaultRolloverStrategy: Delete: basePath: ${sys:pulsar.log.dir} maxDepth: 2 IfFileName: glob: "*/${sys:pulsar.log.file}*log.gz" IfLastModified: age: 30d

Prometheus: name: Prometheus

# Routing Routing: name: RoutingAppender Routes: pattern: "$${ctx:function}" Route: - Routing: name: InstanceRoutingAppender Routes: pattern: "$${ctx:instance}" Route: - RollingFile: name: "Rolling-${ctx:function}" fileName : "${sys:pulsar.log.dir}/functions/${ctx:function}/${ctx:functionname}-${ctx:instance}.log" filePattern : "${sys:pulsar.log.dir}/functions/${sys:pulsar.log.file}-${ctx:instance}-%d{MM-dd-yyyy}-%i.log.gz" PatternLayout: Pattern: "%d{ABSOLUTE} %level{length=5} [%thread] [instance: %X{instance}] %logger{1} - %msg%n" Policies: TimeBasedTriggeringPolicy: interval: 1 modulate: true SizeBasedTriggeringPolicy: size: "20MB" # Trigger every day at midnight that also scan # roll-over strategy that deletes older file CronTriggeringPolicy: schedule: "0 0 0 * * ?" # Delete file older than 30days DefaultRolloverStrategy: Delete: basePath: ${sys:pulsar.log.dir} maxDepth: 2 IfFileName: glob: "*/${sys:pulsar.log.file}*log.gz" IfLastModified: age: 30d - ref: "${sys:pulsar.routing.appender.default}" key: "${ctx:function}" - ref: "${sys:pulsar.routing.appender.default}" key: "${ctx:function}"

Loggers:

# Default root logger configuration AsyncRoot: level: "${sys:pulsar.log.root.level}" additivity: true AppenderRef: - ref: "Failover" level: "${sys:pulsar.log.level}" - ref: Prometheus level: info

AsyncLogger: - name: org.apache.bookkeeper.bookie.BookieShell level: info additivity: false AppenderRef: - ref: Console

- name: verbose level: info additivity: false AppenderRef: - ref: Console

# Logger to inject filter script# - name: org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl# level: debug# additivity: false# AppenderRef:# ref: "${sys:pulsar.log.appender}"# ScriptFilter:# onMatch: ACCEPT# onMisMatch: DENY# ScriptRef:# ref: filter.js

注意事项:

•日志接入必须异步,绝对不能影响服务性能;•响应要求比较高的系统接入第三方系统,必须依赖解耦,此处的 Failover Appender 就是解耦对 Kafka 的依赖,当 Kafka Crash 时,日志触发 Failover,写本地即可;•log4j2 Failover appender retryIntervalSeconds 的默认值是 1 分钟,是通过异常来切换的,所以可以适量加大间隔,比如上面的 10分钟;•Kafka appender ignoreExceptions 必须设置为 false,否则无法触发 Failover;•这里有个比较大的坑是 max.block.ms Property,KafkaClient 包里默认值是 60000ms,当 Kafka 宕机时,尝试写 Kafka 需要 1 分钟才能返回 Exception,之后才会触发 Failover,当请求量大时,log4j2 队列很快就会打满,之后写日志就 Blocking,严重影响到主服务响应。所以要设置足够短,队列长度足够长。

2、添加脚本文件

在 {PULSAR_HOME}/bin 目录中添加以下两个脚本文件: 
1)pulsar-kafka 
该脚本文件是从 pulsar 脚本文件复制而来,在 pulsar 脚本文件的基础上添加如下修改: (说明:下图中左侧为 pulsar,右侧为 pulsar-kafka。)

•指定 log4j2-kafka.yaml;•添加读取 logenv.sh 的内容;•添加OPTS 选项,通过 pulsar-kafka 和 pulsar-daemon-kafka 两个脚本文件中启动 Pulsar 组件时将JVM 选项传递给 Java 进程;•pulsar-kafka 脚本文件的完整内容如下:

#!/usr/bin/env bash## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.#

BINDIR=$(dirname "$0")export PULSAR_HOME=`cd -P $BINDIR/..;pwd`

DEFAULT_BROKER_CONF=$PULSAR_HOME/conf/broker.confDEFAULT_BOOKKEEPER_CONF=$PULSAR_HOME/conf/bookkeeper.confDEFAULT_ZK_CONF=$PULSAR_HOME/conf/zookeeper.confDEFAULT_CONFIGURATION_STORE_CONF=$PULSAR_HOME/conf/global_zookeeper.confDEFAULT_DISCOVERY_CONF=$PULSAR_HOME/conf/discovery.confDEFAULT_PROXY_CONF=$PULSAR_HOME/conf/proxy.confDEFAULT_STANDALONE_CONF=$PULSAR_HOME/conf/standalone.confDEFAULT_WEBSOCKET_CONF=$PULSAR_HOME/conf/websocket.confDEFAULT_LOG_CONF=$PULSAR_HOME/conf/log4j2-kafka.yamlDEFAULT_PULSAR_PRESTO_CONF=${PULSAR_HOME}/conf/presto

# functions related variablesFUNCTIONS_HOME=$PULSAR_HOME/pulsar-functionsDEFAULT_WORKER_CONF=$PULSAR_HOME/conf/functions_worker.ymlDEFAULT_JAVA_INSTANCE_JAR=$PULSAR_HOME/instances/java-instance.jarJAVA_INSTANCE_JAR=${PULSAR_JAVA_INSTANCE_JAR:-"${DEFAULT_JAVA_INSTANCE_JAR}"}DEFAULT_PY_INSTANCE_FILE=$PULSAR_HOME/instances/python-instance/python_instance_main.pyPY_INSTANCE_FILE=${PULSAR_PY_INSTANCE_FILE:-"${DEFAULT_PY_INSTANCE_FILE}"}DEFAULT_FUNCTIONS_EXTRA_DEPS_DIR=$PULSAR_HOME/instances/depsFUNCTIONS_EXTRA_DEPS_DIR=${PULSAR_FUNCTIONS_EXTRA_DEPS_DIR:-"${DEFAULT_FUNCTIONS_EXTRA_DEPS_DIR}"}SQL_HOME=$PULSAR_HOME/pulsar-sqlPRESTO_HOME=${PULSAR_HOME}/lib/presto

# Check bookkeeper env and load bkenv.shif [ -f "$PULSAR_HOME/conf/bkenv.sh" ]then . "$PULSAR_HOME/conf/bkenv.sh"fi

# Check pulsar env and load pulser_env.shif [ -f "$PULSAR_HOME/conf/pulsar_env.sh" ]then . "$PULSAR_HOME/conf/pulsar_env.sh"fi

if [ -f "$PULSAR_HOME/conf/logenv.sh" ]then . "$PULSAR_HOME/conf/logenv.sh"fi

# Check for the java to useif [[ -z $JAVA_HOME ]]; then JAVA=$(which java) if [ $? != 0 ]; then echo "Error: JAVA_HOME not set, and no java executable found in $PATH." 1>&2 exit 1 fielse JAVA=$JAVA_HOME/bin/javafi

# exclude tests jarRELEASE_JAR=`ls $PULSAR_HOME/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1`if [ $? == 0 ]; then PULSAR_JAR=$RELEASE_JARfi

# exclude tests jarBUILT_JAR=`ls $PULSAR_HOME/pulsar-broker/target/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1`if [ $? != 0 ] && [ ! -e "$PULSAR_JAR" ]; then echo "\nCouldn't find pulsar jar."; echo "Make sure you've run 'mvn package'\n"; exit 1;elif [ -e "$BUILT_JAR" ]; then PULSAR_JAR=$BUILT_JARfi

## find the instance locations for pulsar-functions#

# find the java instance locationif [ ! -f "${JAVA_INSTANCE_JAR}" ]; then # didn't find a released jar, then search the built jar BUILT_JAVA_INSTANCE_JAR="${FUNCTIONS_HOME}/runtime-all/target/java-instance.jar" if [ -z "${BUILT_JAVA_INSTANCE_JAR}" ]; then echo "\nCouldn't find pulsar-functions java instance jar."; echo "Make sure you've run 'mvn package'\n"; exit 1; fi JAVA_INSTANCE_JAR=${BUILT_JAVA_INSTANCE_JAR}fi

# find the python instance locationif [ ! -f "${PY_INSTANCE_FILE}" ]; then # didn't find a released python instance, then search the built python instance BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/instance/target/python-instance/python_instance_main.py" if [ -z "${BUILT_PY_INSTANCE_FILE}" ]; then echo "\nCouldn't find pulsar-functions python instance."; echo "Make sure you've run 'mvn package'\n"; exit 1; fi PY_INSTANCE_FILE=${BUILT_PY_INSTANCE_FILE}fi

# find pulsar sql presto distribution locationcheck_presto_libraries() { if [ ! -d "${PRESTO_HOME}" ]; then

BUILT_PRESTO_HOME="${SQL_HOME}/presto-distribution/target/pulsar-presto-distribution" if [ ! -d "${BUILT_PRESTO_HOME}" ]; then echo "\nCouldn't find presto distribution."; echo "Make sure you've run 'mvn package'\n"; exit 1; fi PRESTO_HOME=${BUILT_PRESTO_HOME} fi}

pulsar_help() { cat <<EOFUsage: pulsar <command>where command is one of:

broker Run a broker server bookie Run a bookie server zookeeper Run a zookeeper server configuration-store Run a configuration-store server discovery Run a discovery server proxy Run a pulsar proxy websocket Run a web socket proxy server functions-worker Run a functions worker server sql-worker Run a sql worker server sql Run sql CLI standalone Run a broker server with local bookies and local zookeeper

initialize-cluster-metadata One-time metadata initialization delete-cluster-metadata Delete a cluster's metadata initialize-transaction-coordinator-metadata One-time transaction coordinator metadata initialization initialize-namespace namespace initialization compact-topic Run compaction against a topic zookeeper-shell Open a ZK shell client broker-tool CLI to operate a specific broker tokens Utility to create authentication tokens

help This help message

or command is the full name of a class with a defined main() method.

Environment variables: PULSAR_LOG_CONF Log4j configuration file (default $DEFAULT_LOG_CONF) PULSAR_BROKER_CONF Configuration file for broker (default: $DEFAULT_BROKER_CONF) PULSAR_BOOKKEEPER_CONF Configuration file for bookie (default: $DEFAULT_BOOKKEEPER_CONF) PULSAR_ZK_CONF Configuration file for zookeeper (default: $DEFAULT_ZK_CONF) PULSAR_CONFIGURATION_STORE_CONF Configuration file for global configuration store (default: $DEFAULT_CONFIGURATION_STORE_CONF) PULSAR_DISCOVERY_CONF Configuration file for discovery service (default: $DEFAULT_DISCOVERY_CONF) PULSAR_WEBSOCKET_CONF Configuration file for websocket proxy (default: $DEFAULT_WEBSOCKET_CONF) PULSAR_PROXY_CONF Configuration file for Pulsar proxy (default: $DEFAULT_PROXY_CONF) PULSAR_WORKER_CONF Configuration file for functions worker (default: $DEFAULT_WORKER_CONF) PULSAR_STANDALONE_CONF Configuration file for standalone (default: $DEFAULT_STANDALONE_CONF) PULSAR_PRESTO_CONF Configuration directory for Pulsar Presto (default: $DEFAULT_PULSAR_PRESTO_CONF) PULSAR_EXTRA_OPTS Extra options to be passed to the jvm PULSAR_EXTRA_CLASSPATH Add extra paths to the pulsar classpath PULSAR_PID_DIR Folder where the pulsar server PID file should be stored PULSAR_STOP_TIMEOUT Wait time before forcefully kill the pulsar server instance, if the stop is not successful

These variable can also be set in conf/pulsar_env.shEOF}

add_maven_deps_to_classpath() { MVN="mvn" if [ "$MAVEN_HOME" != "" ]; then MVN=${MAVEN_HOME}/bin/mvn fi

# Need to generate classpath from maven pom. This is costly so generate it # and cache it. Save the file into our target dir so a mvn clean will get # clean it up and force us create a new one. f="${PULSAR_HOME}/distribution/server/target/classpath.txt" if [ ! -f "${f}" ] then ${MVN} -f "${PULSAR_HOME}/pom.xml" dependency:build-classpath -DincludeScope=compile -Dmdep.outputFile="${f}" &> /dev/null fi PULSAR_CLASSPATH=${CLASSPATH}:`cat "${f}"`}

if [ -d "$PULSAR_HOME/lib" ]; thenPULSAR_CLASSPATH=$PULSAR_CLASSPATH:$PULSAR_HOME/lib/* ASPECTJ_AGENT_PATH=`ls -1 $PULSAR_HOME/lib/org.aspectj-aspectjweaver-*.jar`else add_maven_deps_to_classpath

ASPECTJ_VERSION=`grep '<aspectj.version>' $PULSAR_HOME/pom.xml | awk -F'>' '{print $2}' | awk -F'<' '{print $1}'` ASPECTJ_AGENT_PATH="$HOME/.m2/repository/org/aspectj/aspectjweaver/$ASPECTJ_VERSION/aspectjweaver-$ASPECTJ_VERSION.jar"fi

ASPECTJ_AGENT="-javaagent:$ASPECTJ_AGENT_PATH"

# if no args specified, show usageif [ $# = 0 ]; then pulsar_help; exit 1;fi

# get argumentsCOMMAND=$1shift

if [ -z "$PULSAR_WORKER_CONF" ]; then PULSAR_WORKER_CONF=$DEFAULT_WORKER_CONFfi

if [ -z "$PULSAR_BROKER_CONF" ]; then PULSAR_BROKER_CONF=$DEFAULT_BROKER_CONFfi

if [ -z "$PULSAR_BOOKKEEPER_CONF" ]; then PULSAR_BOOKKEEPER_CONF=$DEFAULT_BOOKKEEPER_CONFfi

if [ -z "$PULSAR_ZK_CONF" ]; then PULSAR_ZK_CONF=$DEFAULT_ZK_CONFfi

if [ -z "$PULSAR_GLOBAL_ZK_CONF" ]; then PULSAR_GLOBAL_ZK_CONF=$DEFAULT_GLOBAL_ZK_CONFfi

if [ -z "$PULSAR_CONFIGURATION_STORE_CONF" ]; then PULSAR_CONFIGURATION_STORE_CONF=$DEFAULT_CONFIGURATION_STORE_CONFfi

if [ -z "$PULSAR_DISCOVERY_CONF" ]; then PULSAR_DISCOVERY_CONF=$DEFAULT_DISCOVERY_CONFfi

if [ -z "$PULSAR_PROXY_CONF" ]; then PULSAR_PROXY_CONF=$DEFAULT_PROXY_CONFfi

if [ -z "$PULSAR_WEBSOCKET_CONF" ]; then PULSAR_WEBSOCKET_CONF=$DEFAULT_WEBSOCKET_CONFfi

if [ -z "$PULSAR_STANDALONE_CONF" ]; then PULSAR_STANDALONE_CONF=$DEFAULT_STANDALONE_CONFfi

if [ -z "$PULSAR_LOG_CONF" ]; then PULSAR_LOG_CONF=$DEFAULT_LOG_CONFfi

if [ -z "$PULSAR_PRESTO_CONF" ]; then PULSAR_PRESTO_CONF=$DEFAULT_PULSAR_PRESTO_CONFfi

PULSAR_CLASSPATH="$PULSAR_JAR:$PULSAR_CLASSPATH:$PULSAR_EXTRA_CLASSPATH"PULSAR_CLASSPATH="`dirname $PULSAR_LOG_CONF`:$PULSAR_CLASSPATH"OPTS="$OPTS -Dlog4j.configurationFile=`basename $PULSAR_LOG_CONF`"

# Ensure we can read bigger content from ZK. (It might be# rarely needed when trying to list many z-nodes under a# directory)OPTS="$OPTS -Djute.maxbuffer=10485760 -Djava.net.preferIPv4Stack=true"

OPTS="-cp $PULSAR_CLASSPATH $OPTS"

OPTS="$OPTS $PULSAR_EXTRA_OPTS $PULSAR_MEM $PULSAR_GC"

# log directory & filePULSAR_LOG_DIR=${PULSAR_LOG_DIR:-"$PULSAR_HOME/logs"}PULSAR_LOG_APPENDER=${PULSAR_LOG_APPENDER:-"RoutingAppender"}PULSAR_LOG_ROOT_LEVEL=${PULSAR_LOG_ROOT_LEVEL:-"info"}PULSAR_LOG_LEVEL=${PULSAR_LOG_LEVEL:-"info"}PULSAR_ROUTING_APPENDER_DEFAULT=${PULSAR_ROUTING_APPENDER_DEFAULT:-"Console"}

#Configure log configuration system propertiesOPTS="$OPTS -Dpulsar.log.appender=$PULSAR_LOG_APPENDER"OPTS="$OPTS -Dpulsar.log.dir=$PULSAR_LOG_DIR"OPTS="$OPTS -Dpulsar.log.level=$PULSAR_LOG_LEVEL"OPTS="$OPTS -Dpulsar.routing.appender.default=$PULSAR_ROUTING_APPENDER_DEFAULT"

# Functions related loggingOPTS="$OPTS -Dpulsar.functions.process.container.log.dir=$PULSAR_LOG_DIR"# instanceOPTS="$OPTS -Dpulsar.functions.java.instance.jar=${JAVA_INSTANCE_JAR}"OPTS="$OPTS -Dpulsar.functions.python.instance.file=${PY_INSTANCE_FILE}"OPTS="$OPTS -Dpulsar.functions.extra.dependencies.dir=${FUNCTIONS_EXTRA_DEPS_DIR}"OPTS="$OPTS -Dpulsar.functions.instance.classpath=${PULSAR_CLASSPATH}"OPTS="$OPTS -Dpulsar.module.instanceid=${PULSAR_MODULE_INSTANCE_ID} -Dpulsar.module.type=$COMMAND -Dkafka.cluster=${KAFKA_CLUSTER} -Dpulsar.hostname=${HOSTNAME} -Dpulsar.hostip=${HOST_IP} -Dpulsar.cluster=${PULSAR_CLUSTER} -Dpulsar.topic=${PULSAR_TOPIC}"

ZK_OPTS=" -Dzookeeper.4lw.commands.whitelist=* -Dzookeeper.snapshot.trust.empty=true"

#Change to PULSAR_HOME to support relative pathscd "$PULSAR_HOME"if [ $COMMAND == "broker" ]; then PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-broker.log"} exec $JAVA $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.PulsarBrokerStarter --broker-conf $PULSAR_BROKER_CONF $@elif [ $COMMAND == "bookie" ]; then PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"bookkeeper.log"} # Pass BOOKIE_EXTRA_OPTS option defined in pulsar_env.sh OPTS="$OPTS $BOOKIE_EXTRA_OPTS" exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.bookkeeper.proto.BookieServer --conf $PULSAR_BOOKKEEPER_CONF $@elif [ $COMMAND == "zookeeper" ]; then PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"zookeeper.log"} exec $JAVA ${ZK_OPTS} $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.zookeeper.ZooKeeperStarter $PULSAR_ZK_CONF $@elif [ $COMMAND == "global-zookeeper" ]; then PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"global-zookeeper.log"} # Allow global ZK to turn into read-only mode when it cannot reach the quorum OPTS="${OPTS} ${ZK_OPTS} -Dreadonlymode.enabled=true" exec $JAVA $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.zookeeper.ConfigurationStoreStarter $PULSAR_GLOBAL_ZK_CONF $@elif [ $COMMAND == "configuration-store" ]; then PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"configuration-store.log"} # Allow global ZK to turn into read-only mode when it cannot reach the quorum OPTS="${OPTS} ${ZK_OPTS} -Dreadonlymode.enabled=true" exec $JAVA $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.zookeeper.ConfigurationStoreStarter $PULSAR_CONFIGURATION_STORE_CONF $@elif [ $COMMAND == "discovery" ]; then PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"discovery.log"} exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.discovery.service.server.DiscoveryServiceStarter $PULSAR_DISCOVERY_CONF $@elif [ $COMMAND == "proxy" ]; then PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-proxy.log"} exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.proxy.server.ProxyServiceStarter --config $PULSAR_PROXY_CONF $@elif [ $COMMAND == "websocket" ]; then PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-websocket.log"} exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.websocket.service.WebSocketServiceStarter $PULSAR_WEBSOCKET_CONF $@elif [ $COMMAND == "functions-worker" ]; then PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-functions-worker.log"} exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.functions.worker.FunctionWorkerStarter -c $PULSAR_WORKER_CONF $@elif [ $COMMAND == "standalone" ]; then PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-standalone.log"} exec $JAVA $OPTS $ASPECTJ_AGENT ${ZK_OPTS} -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.PulsarStandaloneStarter --config $PULSAR_STANDALONE_CONF $@elif [ $COMMAND == "initialize-cluster-metadata" ]; then exec $JAVA $OPTS org.apache.pulsar.PulsarClusterMetadataSetup $@elif [ $COMMAND == "delete-cluster-metadata" ]; then exec $JAVA $OPTS org.apache.pulsar.PulsarClusterMetadataTeardown $@elif [ $COMMAND == "initialize-transaction-coordinator-metadata" ]; then exec $JAVA $OPTS org.apache.pulsar.PulsarTransactionCoordinatorMetadataSetup $@elif [ $COMMAND == "initialize-namespace" ]; then exec $JAVA $OPTS org.apache.pulsar.PulsarInitialNamespaceSetup $@elif [ $COMMAND == "zookeeper-shell" ]; then exec $JAVA $OPTS org.apache.zookeeper.ZooKeeperMain $@elif [ $COMMAND == "broker-tool" ]; then exec $JAVA $OPTS org.apache.pulsar.broker.tools.BrokerTool $@elif [ $COMMAND == "compact-topic" ]; then exec $JAVA $OPTS org.apache.pulsar.compaction.CompactorTool --broker-conf $PULSAR_BROKER_CONF $@elif [ $COMMAND == "sql" ]; then check_presto_libraries exec $JAVA -cp "${PRESTO_HOME}/lib/*" io.prestosql.cli.Presto --server localhost:8081 "${@}"elif [ $COMMAND == "sql-worker" ]; then check_presto_libraries exec ${PRESTO_HOME}/bin/launcher --etc-dir ${PULSAR_PRESTO_CONF} "${@}"elif [ $COMMAND == "tokens" ]; then exec $JAVA $OPTS org.apache.pulsar.utils.auth.tokens.TokensCliUtils $@elif [ $COMMAND == "help" -o $COMMAND == "--help" -o $COMMAND == "-h" ]; then pulsar_help;else echo "" echo "-- Invalid command '$COMMAND' -- Use '$0 help' to get a list of valid commands" echo "" exit 1fi

2)pulsar-daemon-kafka 

该脚本文件是从 pulsar-daemon 脚本文件复制而来,在 pulsar-daemon 脚本文件的基础上添加如下修改: (说明:下图中左侧为 pulsar-daemon,右侧为 pulsar-daemon-kafka。)

•添加读取 logenv.sh 的内容;•读取 pulsar-kafka 的内容;•pulsar-daemon-kafka 脚本文件的完整内容如下:

#!/usr/bin/env bash## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.#

usage() { cat <<EOFUsage: pulsar-daemon (start|stop) <command> <args...>where command is one of: broker Run a broker server bookie Run a bookie server zookeeper Run a zookeeper server configuration-store Run a configuration-store server discovery Run a discovery server websocket Run a websocket proxy server functions-worker Run a functions worker server standalone Run a standalone Pulsar service proxy Run a Proxy Pulsar service

where argument is one of: -force (accepted only with stop command): Decides whether to stop the server forcefully if not stopped by normal shutdownEOF}

BINDIR=$(dirname "$0")PULSAR_HOME=$(cd -P $BINDIR/..;pwd)

# Check bookkeeper env and load bkenv.shif [ -f "$PULSAR_HOME/conf/bkenv.sh" ]then . "$PULSAR_HOME/conf/bkenv.sh"fi

if [ -f "$PULSAR_HOME/conf/pulsar_env.sh" ]then . "$PULSAR_HOME/conf/pulsar_env.sh"fi

if [ -f "$PULSAR_HOME/conf/logenv.sh" ]then . "$PULSAR_HOME/conf/logenv.sh"fi
PULSAR_LOG_APPENDER=${PULSAR_LOG_APPENDER:-"RollingFile"}PULSAR_STOP_TIMEOUT=${PULSAR_STOP_TIMEOUT:-30}PULSAR_PID_DIR=${PULSAR_PID_DIR:-$PULSAR_HOME/bin}

if [ $# = 0 ]; then usage exit 1elif [ $# = 1 ]; then if [ $1 == "--help" -o $1 == "-h" ]; then usage exit 1 else echo "Error: no enough arguments provided." usage exit 1 fifi

startStop=$1shiftcommand=$1shift

case $command in (broker) echo "doing $startStop $command ..." ;; (bookie) echo "doing $startStop $command ..." ;; (zookeeper) echo "doing $startStop $command ..." ;; (global-zookeeper) echo "doing $startStop $command ..." ;; (configuration-store) echo "doing $startStop $command ..." ;; (discovery) echo "doing $startStop $command ..." ;; (websocket) echo "doing $startStop $command ..." ;; (functions-worker) echo "doing $startStop $command ..." ;; (standalone) echo "doing $startStop $command ..." ;; (proxy) echo "doing $startStop $command ..." ;; (*) echo "Error: unknown service name $command" usage exit 1 ;;esac

export PULSAR_LOG_DIR=$PULSAR_LOG_DIRexport PULSAR_LOG_APPENDER=$PULSAR_LOG_APPENDERexport PULSAR_LOG_FILE=pulsar-$command-$HOSTNAME.log

pid=$PULSAR_PID_DIR/pulsar-$command.pidout=$PULSAR_LOG_DIR/pulsar-$command-$HOSTNAME.outlogfile=$PULSAR_LOG_DIR/$PULSAR_LOG_FILE

rotate_out_log (){ log=$1; num=5; if [ -n "$2" ]; then num=$2 fi if [ -f "$log" ]; then # rotate logs while [ $num -gt 1 ]; do prev=`expr $num - 1` [ -f "$log.$prev" ] && mv "$log.$prev" "$log.$num" num=$prev done mv "$log" "$log.$num"; fi}

mkdir -p "$PULSAR_LOG_DIR"

case $startStop in (start) if [ -f $pid ]; then if kill -0 `cat $pid` > /dev/null 2>&1; then echo $command running as process `cat $pid`. Stop it first. exit 1 fi fi

rotate_out_log $out echo starting $command, logging to $logfile echo Note: Set immediateFlush to true in conf/log4j2-kafka.yaml will guarantee the logging event is flushing to disk immediately. The default behavior is switched off due to performance considerations. pulsar=$PULSAR_HOME/bin/pulsar-kafka nohup $pulsar $command "$@" > "$out" 2>&1 < /dev/null & echo $! > $pid sleep 1; head $out sleep 2; if ! ps -p $! > /dev/null ; then exit 1 fi ;;

(stop) if [ -f $pid ]; then TARGET_PID=$(cat $pid) if kill -0 $TARGET_PID > /dev/null 2>&1; then echo "stopping $command" kill $TARGET_PID

count=0 location=$PULSAR_LOG_DIR while ps -p $TARGET_PID > /dev/null; do echo "Shutdown is in progress... Please wait..." sleep 1 count=`expr $count + 1`

if [ "$count" = "$PULSAR_STOP_TIMEOUT" ]; then break fi done

if [ "$count" != "$PULSAR_STOP_TIMEOUT" ]; then echo "Shutdown completed." fi

if kill -0 $TARGET_PID > /dev/null 2>&1; then fileName=$location/$command.out $JAVA_HOME/bin/jstack $TARGET_PID > $fileName echo "Thread dumps are taken for analysis at $fileName" if [ "$1" == "-force" ] then echo "forcefully stopping $command" kill -9 $TARGET_PID >/dev/null 2>&1 echo Successfully stopped the process else echo "WARNNING : $command is not stopped completely." exit 1 fi fi else echo "no $command to stop" fi rm $pid else echo no "$command to stop" fi ;;

(*) usage exit 1 ;;esac

3、添加 Kafka Producer 依赖的 jar

在 pulsar 集群的所有节点上的 {PULSAR_HOME}/lib 目录中添加以下 3 个 jar:

connect-api-2.0.1.jardisruptor-3.4.2.jarkafka-clients-2.0.1.jar

4、启动Pulsar 服务


1.为了确保 Pulsar 服务的日志能够正确的写入 Kafka,先通过 bin/pulsar-kafka 前台启动,在没有异常的情况下,再通过 bin/pulsar-daemon-kafka 命令后台启动。2.以启动 broker 为例,执行以下命令:

bin/pulsar-daemon-kafka start broker

3. 通过 ps 命令查看 broker 进程如下:

在上图可以看到,我们通过 logenv.sh 配置的 OPTS 都已经传递到 broker 进程中,log4j2-kafka.yaml 中的 sys 标签便可以通过这些属性值实例化一个 Kafka Producer,broker 进程的日志便会通过 Kafka Producer 发送到 Kafka broker 中。

5、测试 Pulsar 日志是否成功写入 Kafka broker

启动一个Kafka Consumer ,订阅log4j2 发送消息的Topic,读取到的消息内容如下,多个检索字段之间以空格分开:

pulsar-cluster dapp21 192.168.0.1 broker 1 2020-12-26 17:40:14.363 [prometheus-stats-43-1] [org.eclipse.jetty.server.RequestLog] INFO - 192.168.0.1 - - [26/Dec/2020:17:40:14 +0800] "GET /metrics/ HTTP/1.1" 200 23445 "http://192.168.0.1:8080/metrics" "Prometheus/2.22.1" 4

6、日志检索

打开 kibana 页面,根据分词的字段进行检索,检索条件如下: 
cluster:"pulsar-cluster" AND hostname:"XXX" AND module:"broker" AND level:"INFO"

在上图中可以看到某个时间段内的日志检索结果,并且可以根据需要,在检索结果中添加 Available fields。这样,开发或运维人员可以通过 kibana 从多个维度快速有效的分析 Pulsar 服务异常的原因。至此,就是 Apache Pulsar 基于 Log4j2+Kafka+ELK 实现日志的快速检索的一套完整的解决方案。

总结

目前,分布式、微服务化是比较流行的技术方向,在生产系统中,随着业务的不断发展, 应用和服务体量的快速扩张,从单体/垂直架构转移到分布式/微服务架构是自然而然的选择,它主要表现在降低复杂度、容错、独立部署、水平伸缩等方面。但同时也面临着新的挑战,如问题排查的效率,运维监控的便捷性等。本文以 Apache Pulsar 为例,分享 Java 进程如何使用 Log4j2+Kafka+ELK 实现分布式、微服务化的日志的快速检索,达到服务治理的效果。

相关阅读

关注 StreamCloudNative,与作者探讨各领域技术的发展趋势👇

•  使用 Elastic Beats 搜集日志到 Pulsar•  如何使用 Apache Flume 发送日志数据至 Apache Pulsar•  KoP 正式开源:在 Apache Pulsar 上支持原生 Kafka 协议

欢迎投稿

你是否从这篇文章中得到启发呢?

你有没有独特的经验与社区小伙伴分享、和社区共同成长呢?

Apache Pulsar 社区欢迎大家踊跃投稿。Apache Pulsar 和 StreamNative 希望为大家提供 Pulsar 经验与知识分享的平台,并帮助更多的社区小伙伴深入了解 Pulsar。扫码添加 Bot 好友即可联络投稿👇

点击下方,阅读原文吧!

: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存