Apache Kafka 源¶
KafkaSource读取存储在现有Apache Kafka主题中的消息,并将这些消息作为CloudEvents通过HTTP发送到其配置的接收器。
KafkaSource保留了存储在主题分区中的消息的顺序。
它通过等待来自接收器的成功响应,然后在同一分区中传递下一个消息来实现这一点。
安装KafkaSource控制器¶
-
Install the
KafkaSourcecontroller by entering the following command:kubectl apply -f https://storage.googleapis.com/knative-nightly/eventing-kafka-broker/latest/eventing-kafka-controller.yaml -
Install the Kafka Source data plane by entering the following command:
kubectl apply -f https://storage.googleapis.com/knative-nightly/eventing-kafka-broker/latest/eventing-kafka-source.yaml -
Verify that
kafka-controllerandkafka-source-dispatcherare running, by entering the following command:kubectl get deployments.apps -n knative-eventingExample output:
NAME READY UP-TO-DATE AVAILABLE AGE kafka-controller 1/1 1 1 3s kafka-source-dispatcher 1/1 1 1 4s
可选:创建Kafka主题¶
Note
The create a Kafka topic section assumes you're using Strimzi to operate Apache Kafka, however equivalent operations can be replicated using the Apache Kafka CLI or any other tool.
If you are using Strimzi:
-
Create a
KafkaTopicYAML file:apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: knative-demo-topic namespace: kafka labels: strimzi.io/cluster: my-cluster spec: partitions: 3 replicas: 1 config: retention.ms: 7200000 segment.bytes: 1073741824 -
Deploy the
KafkaTopicYAML file by running the command:Wherekubectl apply -f <filename>.yaml<filename>is the name of yourKafkaTopicYAML file.Example output:
kafkatopic.kafka.strimzi.io/knative-demo-topic created -
Ensure that the
KafkaTopicis running by running the command:kubectl -n kafka get kafkatopics.kafka.strimzi.ioExample output:
NAME CLUSTER PARTITIONS REPLICATION FACTOR knative-demo-topic my-cluster 3 1
创建服务¶
-
Create the
event-displayService as a YAML file:apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display namespace: default spec: template: spec: containers: - # This corresponds to # https://github.com/knative/eventing/tree/main/cmd/event_display/main.go image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display -
Apply the YAML file by running the command:
Wherekubectl apply -f <filename>.yaml<filename>is the name of the file you created in the previous step.Example output:
service.serving.knative.dev/event-display created -
Ensure that the Service Pod is running, by running the command:
kubectl get podsThe Pod name is prefixed with
event-display:NAME READY STATUS RESTARTS AGE event-display-00001-deployment-5d5df6c7-gv2j4 2/2 Running 0 72s
卡夫卡事件源¶
-
Modify
source/event-source.yamlaccordingly with bootstrap servers, topics, and so on:apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: kafka-source spec: consumerGroup: knative-group bootstrapServers: - my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace topics: - knative-demo-topic sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display -
Deploy the event source:
kubectl apply -f event-source.yamlExample output:
kafkasource.sources.knative.dev/kafka-source created -
Verify that the KafkaSource is ready:
kubectl get kafkasource kafka-sourceExample output:
NAME TOPICS BOOTSTRAPSERVERS READY REASON AGE kafka-source ["knative-demo-topic"] ["my-cluster-kafka-bootstrap.kafka:9092"] True 26h
验证¶
-
Produce a message (
{"msg": "This is a test!"}) to the Apache Kafka topic as in the following example:kubectl -n kafka run kafka-producer -ti --image=strimzi/kafka:0.14.0-kafka-2.3.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic knative-demo-topicTip
If you don't see a command prompt, try pressing Enter.
-
Verify that the Service received the message from the event source:
kubectl logs --selector='serving.knative.dev/service=event-display' -c user-containerExample output:
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.kafka.event source: /apis/v1/namespaces/default/kafkasources/kafka-source#my-topic subject: partition:0#564 id: partition:0/offset:564 time: 2020-02-10T18:10:23.861866615Z datacontenttype: application/json Extensions, key: Data, { "msg": "This is a test!" }
可选:指定键反序列化器¶
When KafkaSource receives a message from Kafka, it dumps the key in the Event extension called
Key and dumps Kafka message headers in the extensions starting with kafkaheader.
You can specify the key deserializer among four types:
string(default) for UTF-8 encoded stringsintfor 32-bit & 64-bit signed integersfloatfor 32-bit & 64-bit floating pointsbyte-arrayfor a Base64 encoded byte array
To specify the key deserializer, add the label kafkasources.sources.knative.dev/key-type to the
KafkaSource definition, as shown in the following example:
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source
labels:
kafkasources.sources.knative.dev/key-type: int
spec:
consumerGroup: knative-group
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
topics:
- knative-demo-topic
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
可选:指定初始偏移量¶
By default the KafkaSource starts consuming from the latest offset in each partition. If you want
to consume from the earliest offset, set the initialOffset field to earliest, for example:
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source
spec:
consumerGroup: knative-group
initialOffset: earliest
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
topics:
- knative-demo-topic
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
Note
The valid values for initialOffset are earliest and latest. Any other value results in a
validation error. This field is honored only if there are no committed offsets for that
consumer group.
连接到一个支持tls的Kafka代理¶
KafkaSource支持TLS和SASL认证方式。启用TLS认证需要以下文件:
- 证书
- 客户端证书和密钥
KafkaSource expects these files to be in PEM format. If they are in another format, such as JKS, convert them to PEM.
-
Create the certificate files as secrets in the namespace where KafkaSource is going to be set up, by running the commands:
kubectl create secret generic cacert --from-file=caroot.pemkubectl create secret tls kafka-secret --cert=certificate.pem --key=key.pem -
Apply the KafkaSource. Modify the
bootstrapServersandtopicsfields accordingly.apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: kafka-source-with-tls spec: net: tls: enable: true cert: secretKeyRef: key: tls.crt name: kafka-secret key: secretKeyRef: key: tls.key name: kafka-secret caCert: secretKeyRef: key: caroot.pem name: cacert consumerGroup: knative-group bootstrapServers: - my-secure-kafka-bootstrap.kafka:443 topics: - knative-demo-topic sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display
启用 SASL for KafkaSources¶
Simple Authentication and Security Layer (SASL) is used by Apache Kafka for authentication. If you use SASL authentication on your cluster, users must provide credentials to Knative for communicating with the Kafka cluster, otherwise events cannot be produced or consumed.
Prerequisites¶
- You have access to a Kafka cluster that has Simple Authentication and Security Layer (SASL).
Procedure¶
-
Create a secret that uses the Kafka cluster's SASL information, by running the following commands:
STRIMZI_CRT=$(kubectl -n kafka get secret example-cluster-cluster-ca-cert --template='{{index.data "ca.crt"}}' | base64 --decode )SASL_PASSWD=$(kubectl -n kafka get secret example-user --template='{{index.data "password"}}' | base64 --decode )kubectl create secret -n default generic <secret_name> \ --from-literal=ca.crt="$STRIMZI_CRT" \ --from-literal=password="$SASL_PASSWD" \ --from-literal=saslType="SCRAM-SHA-512" \ --from-literal=user="example-user" -
Create or modify a KafkaSource so that it contains the following spec options:
apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: example-source spec: ... net: sasl: enable: true user: secretKeyRef: name: <secret_name> key: user password: secretKeyRef: name: <secret_name> key: password saslType: secretKeyRef: name: <secret_name> key: saslType tls: enable: true caCert: secretKeyRef: name: <secret_name> key: ca.crt ...Where
<secret_name>is the name of the secret generated in the previous step.
清理步骤¶
-
Delete the Kafka event source:
kubectl delete -f source/source.yaml kafkasource.sources.knative.devExample output:
"kafka-source" deleted -
Delete the
event-displayService:kubectl delete -f source/event-display.yaml service.serving.knative.devExample output:
"event-display" deleted -
Optional: Remove the Apache Kafka Topic
kubectl delete -f kafka-topic.yamlExample output:
kafkatopic.kafka.strimzi.io "knative-demo-topic" deleted