Apache Kafka Sink¶
本页面介绍如何安装和配置Apache KafkaSink。
先决条件¶
您必须能够访问安装了Knative事件的Kubernetes集群.
安装¶
-
安装Kafka控制器:
kubectl apply -f https://storage.googleapis.com/knative-nightly/eventing-kafka-broker/latest/eventing-kafka-controller.yaml
-
安装KafkaSink数据平面:
kubectl apply -f https://storage.googleapis.com/knative-nightly/eventing-kafka-broker/latest/eventing-kafka-sink.yaml
-
验证
kafka-controller
和kafka-sink-receiver
部署正在运行:kubectl get deployments.apps -n knative-eventing
Example output:
NAME READY UP-TO-DATE AVAILABLE AGE eventing-controller 1/1 1 1 10s eventing-webhook 1/1 1 1 9s kafka-controller 1/1 1 1 3s kafka-sink-receiver 1/1 1 1 5s
KafkaSink例子¶
KafkaSink对象看起来类似如下:
apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
name: my-kafka-sink
namespace: default
spec:
topic: mytopic
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
输出主题内容模式¶
CloudEvent规范定义了两种传输clouddevent的模式:结构化和二进制。
A "structured-mode message" is one where the event is fully encoded using a stand-alone event format and stored in the message body.
The structured content mode keeps event metadata and data together in the payload, allowing simple forwarding of the same event across multiple routing hops, and across multiple protocols.
A "binary-mode message" is one where the event data is stored in the message body, and event attributes are stored as part of message meta-data.
The binary content mode accommodates any shape of event data, and allows for efficient transfer and without transcoding effort.
A KafkaSink object with a specified contentMode
looks similar to the following:
apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
name: my-kafka-sink
namespace: default
spec:
topic: mytopic
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
# CloudEvent content mode of Kafka messages sent to the topic.
# Possible values:
# - structured
# - binary
#
# default: binary.
#
# CloudEvent spec references:
# - https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/spec.md#message
# - https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/bindings/kafka-protocol-binding.md#33-structured-content-mode
# - https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/bindings/kafka-protocol-binding.md#32-binary-content-mode
contentMode: binary # or structured
Security¶
Knative supports the following Apache Kafka security features:
- Apache Kafka Sink
- 先决条件
- 安装
- KafkaSink例子
- 输出主题内容模式
- Security
- Enabling security features
- Kafka Producer configurations
- Enable debug logging for data plane components
Enabling security features¶
To enable security features, in the KafkaSink spec, you can reference a secret:
apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
name: my-kafka-sink
namespace: default
spec:
topic: mytopic
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
auth:
secret:
ref:
name: my_secret
Note
The secret my_secret
must exist in the same namespace of the KafkaSink. Certificates and keys must be in PEM
format._
Authentication using SASL¶
Knative supports the following SASL mechanisms:
PLAIN
SCRAM-SHA-256
SCRAM-SHA-512
To use a specific SASL mechanism replace <sasl_mechanism>
with the mechanism of your choice.
Authentication using SASL without encryption¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SASL_PLAINTEXT \
--from-literal=sasl.mechanism=<sasl_mechanism> \
--from-literal=user=<my_user> \
--from-literal=password=<my_password>
Authentication using SASL and encryption using SSL¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SASL_SSL \
--from-literal=sasl.mechanism=<sasl_mechanism> \
--from-file=ca.crt=caroot.pem \
--from-literal=user=<my_user> \
--from-literal=password=<my_password>
Encryption using SSL without client authentication¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SSL \
--from-file=ca.crt=<my_caroot.pem_file_path> \
--from-literal=user.skip=true
Authentication and encryption using SSL¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SSL \
--from-file=ca.crt=<my_caroot.pem_file_path> \
--from-file=user.crt=<my_cert.pem_file_path> \
--from-file=user.key=<my_key.pem_file_path>
Note
The ca.crt
can be omitted to enable fallback and use the system's root CA set.
Kafka Producer configurations¶
A Kafka Producer is the component responsible for sending events to the Apache Kafka cluster. You can change the configuration for Kafka Producers in your cluster by modifying the config-kafka-sink-data-plane
ConfigMap in the knative-eventing
namespace.
Documentation for the settings available in this ConfigMap is available on the Apache Kafka website, in particular, Producer configurations.
Enable debug logging for data plane components¶
To enable debug logging for data plane components change the logging level to DEBUG
in the kafka-config-logging
ConfigMap.
-
Create the
kafka-config-logging
ConfigMap as a YAML file that contains the following:apiVersion: v1 kind: ConfigMap metadata: name: kafka-config-logging namespace: knative-eventing data: config.xml: | <configuration> <appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender"> <encoder class="net.logstash.logback.encoder.LogstashEncoder"/> </appender> <root level="DEBUG"> <appender-ref ref="jsonConsoleAppender"/> </root> </configuration>
-
Apply the YAML file by running the command:
Wherekubectl apply -f <filename>.yaml
<filename>
is the name of the file you created in the previous step. -
Restart the
kafka-sink-receiver
:kubectl rollout restart deployment -n knative-eventing kafka-sink-receiver