In our last blog, we discussed how to create Event Hub using Azure Portal. However, in this blog, we will discuss more on creating a Producer for Producing messages and Consumers for consuming those messages.
Producer
Event Hub Producer producing messages & sending to Event Hub.
import asyncio
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData
async def run():
event_hub_connection_string = <Event Hub Connection String>
event_hub_name = <EventHub Name>
producer = EventHubProducerClient.from_connection_string(conn_str=event_hub_connection_string, eventhub_name=event_hub_name)
async with producer:
# Create a batch
for i in range(1, 100):
event_data_batch = await producer.create_batch()
# Add events to the batch
event_data_batch.add(EventData(f'First New{i} event'))
event_data_batch.add(EventData(f'Second New{i} event'))
event_data_batch.add(EventData(f'Third New{i} event'))
# Send the batch of events to the event hub.
await producer.send_batch(event_data_batch)
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
Consumer
Helps in Consuming message produced by Event Hub. Here, if you notice while creating client, I have passed the consumer_group as $Default. This is what helps the Consumer consume the messages.
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
async def on_event(partition_context, event):
# Print the event data.
print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))
# Update the checkpoint so that the program doesn't read the events
# that it has already read when you run it next time.
await partition_context.update_checkpoint(event)
async def main():
# Create an Azure blob checkpoint store to store the checkpoints.
# Pass Blob storage details in case Capture is ON
azure_storage_connection_string = <AzureStorageConnectionString>
azure_blob_name = <Blob Name>
# Need to pass either Capture is ON or OFF
event_hub_namespace_connection_string = <EventHubNamespaceConnectionString>
event_hub_name = <EventHubName>
# Pass if Capture is ON
checkpoint_store = BlobCheckpointStore.from_connection_string(azure_storage_connection_string, azure_blob_name)
# Create a consumer client for the event hub (in case capture is off)
client = EventHubConsumerClient.from_connection_string(event_hub_namespace_connection_string, consumer_group="$Default", eventhub_name=event_hub_name)
# Consumer Client if capture is ON
#client = EventHubConsumerClient.from_connection_string(event_hub_namespace_connection_string, consumer_group="$Default", eventhub_name=event_hub_name, checkpoint_store=checkpoint_store)
async with client:
# Call the receive method. Read from the beginning of the partition (starting_position: "-1")
await client.receive(on_event=on_event, starting_position="-1")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# Run the main method.
loop.run_until_complete(main())
OUTPUT
In case you want to consume messages from multiple consumers so we can create multiple consumers from the portal & that consumer can be used in a separate consumer class by specifying the name of the consumer as:
client = EventHubConsumerClient.from_connection_string(event_hub_namespace_connection_string, consumer_group=<consumer2>, eventhub_name=event_hub_name)
To create multiple Consumer Group, go to Event Hub & click on "Consumer Group" which can be found in the left-hand side menu. Now press "+Consumer Group" and done.
Hope you have now got a clear understanding of writing Event Hub Producer & Consumer. In our next blog of this series, I will discuss the integration of Apache Spark with Event Hub.
If you like this blog, please do show your appreciation by hitting like button and sharing this blog. Also, drop any comments about the post & improvements if needed. Till then HAPPY LEARNING.
Hi , I learnt new tech concept, waiting to see how u integrate spark with eventhub.
ReplyDeleteone question how we connect any IOT device stream to this eventhub?