0% found this document useful (0 votes)
110 views111 pages

Slide 5-6 Kafka

The document provides an overview of Apache Kafka and event streaming. It discusses what event streaming is and common use cases. It then describes what a stream is in the context of event streaming. The document also discusses how to store data in Kafka and provides tutorials for installing and running Kafka on Windows and Google Colab. It includes tutorials for using the Kafka Python client and integrating Kafka with Spark structured streaming. The last part provides additional tutorials for streaming data from CSV files, video streaming using Kafka, and real-time anomaly detection with Kafka.
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
110 views111 pages

Slide 5-6 Kafka

The document provides an overview of Apache Kafka and event streaming. It discusses what event streaming is and common use cases. It then describes what a stream is in the context of event streaming. The document also discusses how to store data in Kafka and provides tutorials for installing and running Kafka on Windows and Google Colab. It includes tutorials for using the Kafka Python client and integrating Kafka with Spark structured streaming. The last part provides additional tutorials for streaming data from CSV files, video streaming using Kafka, and real-time anomaly detection with Kafka.
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
You are on page 1/ 111

Distributed and Parallel Computing

Trong-Hop Do
Kafka – A distributed event streaming flatform
What is event streaming?
What can I use event streaming for?
• To process payments and financial transactions in real-time, such as in stock exchanges, banks, and insurances.

• To track and monitor cars, trucks, fleets, and shipments in real-time, such as in logistics and the automotive industry.

• To continuously capture and analyze sensor data from IoT devices or other equipment, such as in factories and wind

parks.

• To collect and immediately react to customer interactions and orders, such as in retail, the hotel and travel industry,

and mobile applications.

• To monitor patients in hospital care and predict changes in condition to ensure timely treatment in emergencies.

• To connect, store, and make available data produced by different divisions of a company.

• To serve as the foundation for data platforms, event-driven architectures, and microservices.
What is a stream?
Store data in Kafka?
Tutorial 1: Kafka installation on Window

• Download Kafka from https://kafka.apache.org/

• Unzip the download file

• Rename the kafka to “kafka” and move it to C:\ drive


Kafka installation on Window

• Open C:\kafka\config\server.properties

• Change the path of log.dir


Kafka installation on Window

• Open C:\kafka\config\zookeeper.properties

• Change the path of dataDir

• By default Apache Kafka will run on port 9092 and Apache Zookeeper will run on port 2181.
Tutorial 2: Run Apache Kafka on Windows

• Start the Kafka cluster


• Run the following command to start ZooKeeper:
cd C:\kafka\
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
Run Apache Kafka on Windows

• Start the Kafka cluster


• Run the following command to start the Kafka broker:
cd C:\kafka\
bin\windows\kafka-server-start.bat .\config\server.properties
Run Apache Kafka on Windows

• Produce and consume some messages


• Run the kafka-topics command to create a Kafka topic named TestTopic

bin\windows\kafka-topics.bat --create --topic TestTopic --bootstrap-server localhost:9092

• Let’s create another topic named NewTopic


bin\windows\kafka-topics.bat --create --topic NewTopic --bootstrap-server localhost:9092

• Let’s show list of created topics


bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092
Run Apache Kafka on Windows

• Produce and consume some messages

• Run the producer and consumer on separate Command Prompt:

bin\windows\kafka-console-producer.bat --topic TestTopic --bootstrap-server localhost:9092


bin\windows\kafka-console-consumer.bat --topic TestTopic --from-beginning --bootstrap-server localhost:9092
Tutorial 3: Kafka Python client
• https://kafka-python.readthedocs.io/en/master/index.html

• Install Kafka-Python

• pip install kafka-python

• Start Zookeeper server and Kafka broker

• Zookeeper is running default on localhost:2181 and Kafka on localhost:9092


Kafka-Python
• Run consumer code
Kafka-Python
• Run producer code
Kafka-Python
• Check the result
Tutorial 4: Run Kafka on Colab
• Download Kafka and unzip
!curl -sSOL https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz
!tar -xzf kafka_2.13-3.3.1.tgz

• Start zookeeper server and kafka server


!./kafka_2.13-3.3.1/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.3.1/config/zookeeper.properties
!./kafka_2.13-3.3.1/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.3.1/config/server.properties

• Create a topic
!./kafka_2.13-3.3.1/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic TestTopic
Run Kafka on Colab

• Describe the created topic


!./kafka_2.13-3.3.1/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic TestTopic

• Write some event in the topic


!./kafka_2.13-3.3.1/bin/kafka-console-producer.sh --topic TestTopic --bootstrap-server 127.0.0.1:9092

• Read the event


!./kafka_2.13-3.3.1/bin/kafka-console-consumer.sh --topic TestTopic --from-beginning --bootstrap-server 127.0.0.1:9092
Run Kafka on Colab
• You can run cells sequentially and get the result (not really streaming)
Run Kafka on Colab
• Or you can run producer and consumer parallelly in different terminals

• Open terminal using Xterm and run consumer (it will be empty at first)

• Open terminal using Xterm and run producer, write some lines and they will appear on the consumer’s terminal
Run Kafka on Colab
• Use kafka-python on Colab
Tutorial 5: Test Kafka and Spark Structure Streaming on Colab
• Start kafka

• Install PySpark

#currently, 3.3.0 is the latest version. However, you still need to specify this.
!pip install pyspark==3.3.0

from pyspark.sql import SparkSession


scala_version = '2.13'
spark_version = '3.3.0‘
packages = [ f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}’ , 'org.apache.kafka:kafka-clients:3.3.1’ ]
spark = SparkSession.builder.master("local").appName("kafka-example").config("spark.jars.packages", ",".join(packages)).getOrCreate()
spark
• Install kafka-python
!pip install kafka-python

from kafka import KafkaProducer


from json import dumps
topic_name = 'Number'
kafka_server = 'localhost:9092'
producer = KafkaProducer(bootstrap_servers=kafka_server,value_serializer = lambda x:dumps(x).encode('utf-8'))
for e in range(1000):
data = {'number' : e}
producer.send(topic_name, value=data)
producer.flush()

• You can test if the topic is sent sucessfully


• Create datafram from Kafka topic
producer.flush()
kafkaDf = spark.read.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe", topic_name)\
.option("startingOffsets", "earliest")\
.load()
kafkaDf.show()

• Show the dataframe in a formatted way


from pyspark.sql.functions import col, concat, lit
kafkaDf.select(
concat(col("topic"), lit(':'),
col("partition").cast("string")).alias("topic_partition"),col("offset"),col("value").cast("string")
).show()
Tutorial 6: Test Kafka and Spark Structure Streaming on Local

• Step1: Start Kafka cluster using Terminal

• Step 2: Run KafkaProducer in Jupyter Notebook

from kafka import KafkaProducer


from json import dumps
from time import sleep

topic_name = 'RandomNumber'
kafka_server = 'localhost:9092'

producer = KafkaProducer(bootstrap_servers=kafka_server,value_serializer = lambda x:dumps(x).encode('utf-8'))

for e in range(1000):
data = {'number' : e}
producer.send(topic_name, value=data)
print(str(data) + " sent")
sleep(5)

producer.flush()
• Open another Jupyter Notebook

• You will reading data from Kafka in two ways:


• Batch query
• Streaming query
• See more at https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
Creating a Kafka Source for Batch Queries
• Create dataframe from Kafka data
topic_name = 'RandomNumber'

kafka_server = 'localhost:9092'

kafkaDf = spark.read.format("kafka").option("kafka.bootstrap.servers", kafka_server).option("subscribe", topic_name).option("startingOffsets",


"earliest").load()

• Show data (converting dataframe to pandas for cleaner view of data)


• Show streaming data using for loop
batchDF = kafkaDf.select(col('topic'),col('offset'),col('value').cast('string').substr(12,1).alias('rand_number'))
from time import sleep
from IPython.display import display, clear_output
for x in range(0, 2000):
try:
print("Showing live view refreshed every 5 seconds")
print(f"Seconds passed: {x*5}")
display(batchDF.toPandas())
sleep(5)
clear_output(wait=True)
except KeyboardInterrupt:
print("break")
break
print("Live view ended...")
• Perform some data aggregation and show live results
Creating a Kafka Source for Streaming Queries
• Create Streaming dataframe from Kafka

streamRawDf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", kafka_server).option("subscribe", topic_name).load()


streamDF = streamRawDf.select(col('topic'),col('offset'),col('value').cast('string').substr(12,1).alias('rand_number'))
checkEvenDF = streamDF.withColumn('Is_Even',col('rand_number').cast('int') % 2 == 0 )

• Write stream

from random import randint


randNum=str(randint(0,10000))
q1name = "queryNumber"+randNum
q2name = "queryCheckEven"+randNum

stream_writer1 = (streamDF.writeStream.queryName(q1name).trigger(processingTime="5 seconds").outputMode("append").format("memory"))


stream_writer2 = (checkEvenDF.writeStream.queryName(q2name).trigger(processingTime="5 seconds").outputMode("append").format("memory"))

query1 = stream_writer1.start()
query2 = stream_writer2.start()
• View streaming result
Tutorial 7: Kafka and MongoDB on Window
Tutorial 8
https://towardsdatascience.com/make-a-mock-real-time-stream-of-data-with-python-and-kafka-7e5e23123582
Tutorial 8: streaming from CSV

• sendStream.py
Tutorial 8: streaming from CSV

• processStream.py
Tutorial 8: streaming from CSV

Start the consumer


Tutorial 8: streaming from CSV
Start the producer
Tutorial 8: streaming from CSV
If you terminate the consumer and then restart it, the streaming will be resumed from where it stop
Tutorial 9

https://medium.com/@kevin.michael.horan/distributed-video-streaming-with-python-and-kafka-551de69fe1dd
Tutorial 9: Video streaming using Kafka
• Producer.py
Tutorial 9: Video streaming using Kafka
• Producer.py
Tutorial 9: Video streaming using Kafka
• Producer.py
Tutorial 9: Video streaming using Kafka
• consumer.py
Tutorial 9: Video streaming using Kafka

• Run consumer.py
Tutorial 9: Video streaming using Kafka
• Stream video from webcam
Tutorial 9: Video streaming using Kafka
• Stream a video entitled Countdow1.mp4
Tutorial 10

https://towardsdatascience.com/real-time-anomaly-detection-with-apache-kafka-and-python-3a40281c01c9
Tutorial 10: real-time anomaly detection
Tutorial 10: real-time anomaly detection

• Producer.py
Tutorial 10: real-time anomaly detection

• train.py
Tutorial 10: real-time anomaly detection

• detector.py
Tutorial 10: real-time anomaly detection
Tutorial 10: real-time anomaly detection
Tutorial 10: real-time anomaly detection
Tutorial 10: real-time anomaly detection
Tutorial 10: real-time anomaly detection
Tutorial 11: Tensorflow-IO and Kafka

https://www.tensorflow.org/io/tutorials/kafka
• Just follow https://www.tensorflow.org/io/tutorials/kafka
Tutorial 12: Spotify Recommendation System

https://www.analyticsvidhya.com/blog/2021/06/spotify-recommendation-system-using-pyspark-and-kafka-streaming/
Tutorial 13: Order book simulation
https://github.com/rongpenl/order-book-simulation
Tutorial 14: Create your own data stream
https://aiven.io/blog/create-your-own-data-stream-for-kafka-with-python-and-faker
Tutorial 15: Bigmart sale prediction
• Dataset: https://www.kaggle.com/datasets/brijbhushannanda1979/bigmart-sales-data
• Use train set to train some simple prediction model using Spark MLlib
• Stream data from test set to Kafka server (remember to set the time interval)
• Create Spark streaming dataframe from Kafka and apply the trained model to get the real-time prediction

You might also like