Seamless Event Processing in Google Cloud: Harnessing Cloud Sink, Pub/Sub, and Cloud Functions for Event Parsing and Web-hook Integration

Faraz Ahmed
Emumba
Published in
6 min readSep 6, 2023

--

Introduction: Polling vs. Event-Driven Approach

In the realm of cloud computing, monitoring and responding to events is a fundamental challenge. Two common approaches for event processing are polling and event-driven methods. To understand their impact, let’s delve into a real-world scenario: managing and monitoring status of instances within a Managed Instance Group (MIG) in Google Cloud.

Polling Method: Monitoring Instance Status

Imagine you have a critical application running on a MIG, and you need to ensure that your application is always aware of the instances currently active. Using a polling method, your application periodically queries the MIG to check for instance statuses. It might look something like this:

  1. Your application repeatedly asks the MIG: “Do you still have all the instances that you should have?”
  2. If the MIG reports an instance as missing, your application takes action to update its records in the database, reflecting the instance’s absence.

Pros of Polling:

  • Simple to implement.
  • Works with systems that do not support event-driven notifications.

Cons of Polling:

  • Inefficient: Frequent polling can consume resources and bandwidth.
  • Latency: Instances might not be updated immediately.
  • Costly: Polling generates unnecessary API calls.

Event-Driven Method: The Power of Event-Based Notifications

Now, let’s explore the event-driven method. Instead of incessantly polling the MIG, your application subscribes to real-time notifications of instance changes. When an instance status changes (e.g., deleted or added), the MIG emits an event, which your application listens to and acts upon.

Real-Life Example:

  • Event: An instance status changes within the MIG.
  • Action: Your application immediately updates its records to reflect the change in instance status.

Pros of Event-Driven:

  • Real-time and responsive.
  • Efficient: No unnecessary polling.
  • Scalable: Handles large volumes of events seamlessly.

Cons of Event-Driven:

  • Requires event sources to support push notifications.

Now that we’ve highlighted the differences between polling and event-driven approaches, let’s explore how to seamlessly implement event processing in Google Cloud using Cloud Sink, Pub/Sub, and Cloud Functions.

Implementation: Seamless Event Processing in Google Cloud

To achieve efficient and responsive event processing in Google Cloud, we’ll leverage a combination of powerful services: Cloud Sink, Pub/Sub, and Cloud Functions. But before we dive into the steps, let’s set the stage by emphasizing the significance of Infrastructure as Code (IaC) using Google’s Deployment Manager.

Harnessing the Power of Deployment Manager for IaC

IaC is a practice that allows you to define and provision your cloud infrastructure using code. Google’s Deployment Manager simplifies this process by enabling you to define and manage cloud resources declaratively. In other words, you describe the desired state of your infrastructure in code, and Deployment Manager takes care of provisioning and maintaining those resources consistently across different environments.

With Deployment Manager, you can easily manage, version, and reproduce your infrastructure. It ensures that your cloud resources are created predictably, reducing the chances of configuration drift and enhancing overall system reliability.

Now, let’s proceed with the steps to seamlessly implement event processing using Cloud Sink, Pub/Sub, and Cloud Functions, all orchestrated by Deployment Manager.

Step 1: Setting Up the Managed Instance Group

A Managed Instance Group is a collection of virtual machine (VM) instances in Google Cloud that allows you to easily manage and scale identical VMs for your applications. MIGs simplify the deployment and scaling of instances, making it effortless to ensure high availability and manage large-scale applications. You define the instance template, and MIG automatically creates and maintains instances based on that template.

def GenerateConfig(context):
"""Generate configuration."""

name_prefix = context.env['deployment']

target_size = 2
max_size = 8

# Define instance template

instance_template = {'name': name_prefix + '-instance-template',
'type': 'compute.v1.instanceTemplate',
'properties': {'zone': context.properties['zone'
], 'properties': { # Replace with your desired image
'machineType': context.properties['machine-type'],
'metadata': {'items': [{'key': 'startup-script',
'value': context.properties['startup-script']},
{'key': 'block-project-ssh-keys', 'value': 'false'
}, {'key': 'ssh-keys',
'value': context.properties['ssh-keys']}]},
'serviceAccounts': [{'email': context.properties['sa-email'],
'scopes': ['https://www.googleapis.com/auth/cloud-platform'
]}],
'disks': [{
'deviceName': 'boot',
'type': 'PERSISTENT',
'autoDelete': True,
'boot': True,
'initializeParams': {'sourceImage': ''.join([COMPUTE_URL_BASE,
'projects/ubuntu-os-cloud/global/images/family/ubuntu-2004-lts'
])},
}],
'networkInterfaces': [{'accessConfigs': [{'name': 'external-nat'
, 'type': 'ONE_TO_ONE_NAT'}],
'network': GlobalComputeUrl(context.env['project'
], 'networks', context.properties['vpc'
]),
'subnetwork': SubNetworksComputeUrl(context.env['project'], context.properties['region'],
context.properties['subnet_name'])}],
}}}

# Define Managed instance group

managed_instance_group = {
'name': name_prefix + '-mig',
'type': 'compute.v1.instanceGroupManager',
'properties': {
'zone': context.properties['zone'],
'targetSize': target_size,
'baseInstanceName': name_prefix + '-instance',
'instanceTemplate': ''.join(['$(ref.', name_prefix, '-instance-template.selfLink)']),
'autoHealingPolicies': [{'initialDelaySec': 300}],
'updatePolicy': {'type': 'PROACTIVE'},
}
}

Step 2: Setting Up Pub/Sub Topics

In the event-driven approach, Pub/Sub acts as the central hub for event notifications. Start by creating a Pub/Sub topic to serve as the destination for your parsed events. Pub/Sub topics enable real-time event-driven communication.

    # Create a Pub/Sub topic

pub_sub_topic = {
'name': name_prefix + '-topic',
'type': 'pubsub.v1.topic',
'properties': {
'name': '{}-topic'.format(name_prefix),
'topic': '{}-topic'.format(name_prefix)
},
'accessControl': {
'gcpIamPolicy': {
'bindings': [
{
'role': 'roles/pubsub.publisher',
'members': [
'$(ref.' + name_prefix + '-sink.writerIdentity)'
]
}
]
}
},
'metadata': {
'dependsOn': [name_prefix + '-sink'] # Ensure there are no dependencies
}
}
# Create a Pub/Sub subscription for the topic

pub_sub_subscription = {
'name': name_prefix + '-sub',
'type': 'gcp-types/pubsub-v1:projects.subscriptions',
'properties': {
'name': '{}-sub'.format(name_prefix),
'subscription': '{}-sub'.format(name_prefix),
'topic': '$(ref.' + name_prefix + '-topic.name)'
},
'metadata': {
'dependsOn': [name_prefix + '-topic'] # Ensure there are no dependencies
}
}

Step 3: Creating a Cloud Sink

A Google Cloud Sink is a powerful mechanism for routing logs and events from various sources to different destinations, including Pub/Sub topics. To filter and route events from a specific Managed Instance Group, create a sink with a specific filter.

    # Create a log sink with the specified filter and Pub/Sub target

log_sink = {
'name': name_prefix + '-sink',
'type': 'logging.v2.sink',
'properties': {
'sink': '{}-sink'.format(name_prefix),
'destination': 'pubsub.googleapis.com/projects/{}/topics/{}-topic'.format(context.env['project'], name_prefix),
'filter': 'resource.type="gce_instance" AND logName="projects/{}/logs/cloudaudit.googleapis.com%2Factivity" AND protoPayload.resourceName:"projects/{}/zones/{}/instances/{}-instance-" AND operation.last="true" AND severity=NOTICE'.format(context.env['project'], context.env['project_number'],context.properties['zone'], name_prefix),
'uniqueWriterIdentity': True
}
}

Step 4: Triggering a Cloud Function for Webhook Integration

To send messages from Pub/Sub to a web-hook, create a Cloud Function. This function subscribes to the Pub/Sub topic and sends messages to your desired web-hook endpoint. Here’s a Python example:

import base64
import requests

def process_message(event, context):
data = base64.b64decode(event['data']).decode('utf-8')
# Send data to your webhook
response = requests.post('YOUR_WEBHOOK_URL', data=data)
if response.status_code == 200:
print('Message sent successfully!')
else:
print(f'Failed to send message: {response.status_code}')
    # Create the Cloud Function

cloud_function = {
'name': name_prefix + '-cf',
'type': 'gcp-types/cloudfunctions-v1:projects.locations.functions',
'properties': {
'parent': 'projects/{}/locations/{}'.format(context.env['project'],context.properties['region']),
'function': name_prefix + '-cf',
'sourceArchiveUrl': 'gs://<Bucket-Name>/<Code-file>.zip',
'entryPoint': 'send_to_webhook',
'runtime': 'python310',
'availableMemoryMb': 256,
'eventTrigger': {
'eventType': 'google.pubsub.topic.publish',
'resource': 'projects/{}/topics/{}-topic'.format(context.env['project'],name_prefix)
},
# 'buildEnvironmentVariables': {
# 'GOOGLE_FUNCTION_SOURCE': 'webhook-function.py' # Update the path accordingly
# }
}
}

# Create Resources

resources = [
instance_template,
managed_instance_group,
cloud_function,
pub_sub_topic,
pub_sub_subscription,
log_sink
]

return {'resources': resources}

This Cloud Function efficiently handles real-time event processing and web-hook integration.

In conclusion, by embracing an event-driven approach using Cloud Sink, Pub/Sub, and Cloud Functions, you can achieve efficient and responsive event processing while eliminating the inefficiencies associated with polling. This architecture ensures your applications are always aware of changes in your cloud environment and can take immediate action when needed, resulting in a more agile and cost-effective cloud infrastructure.

--

--

Cloud explorer, navigating the digital frontier one byte at a time. Architecting my way through virtual landscapes and sharing my cloudy musings. #DevOps