GCP Pub/Sub
The Google Cloud Platform Pub/Sub integration publishes all the events to a configurable GCP Pub/Sub topic. Using the GCP console (or APIs) you are able to create one or multiple Pub/Sub subscriptions, for integrating this with your application(s) or store the data in one of the storage options provided by the Google Cloud Platform.
Events
The GCP Pub/Sub integration exposes all events as documented by Event types.
Attributes
The following attributes are added to each Pub/Sub message:
event
: the event typedevEui
: the device EUI to which the event relates
Example code
The following code example demonstrates how to consume integration events using a GCP Pub/Sub Subscription.
Go
main.go
:
package main
import (
"context"
"encoding/hex"
"errors"
"log"
"cloud.google.com/go/pubsub"
"google.golang.org/api/option"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"github.com/chirpstack/chirpstack/api/go/v4/integration"
)
type handler struct {
json bool
client *pubsub.Client
subscription *pubsub.Subscription
}
func (h *handler) receive() error {
for {
if err := h.subscription.Receive(context.TODO(), h.receiveFunc); err != nil {
return err
}
}
}
func (h *handler) receiveFunc(ctx context.Context, msg *pubsub.Message) {
msg.Ack()
event, ok := msg.Attributes["event"]
if !ok {
log.Printf("event attribute is missing")
}
var err error
switch event {
case "up":
err = h.up(msg.Data)
case "join":
err = h.join(msg.Data)
default:
log.Printf("handler for event %s is not implemented", event)
return
}
if err != nil {
log.Printf("handling event '%s' returned error: %s", event, err)
}
}
func (h *handler) up(b []byte) error {
var up integration.UplinkEvent
if err := h.unmarshal(b, &up); err != nil {
return err
}
log.Printf("Uplink received from %s with payload: %s", up.GetDeviceInfo().DevEui, hex.EncodeToString(up.Data))
return nil
}
func (h *handler) join(b []byte) error {
var join integration.JoinEvent
if err := h.unmarshal(b, &join); err != nil {
return err
}
log.Printf("Device %s joined with DevAddr %s", join.GetDeviceInfo().DevEui, join.DevAddr)
return nil
}
func (h *handler) unmarshal(b []byte, v proto.Message) error {
if h.json {
return protojson.UnmarshalOptions{
DiscardUnknown: true,
AllowPartial: true,
}.Unmarshal(b, v)
}
return proto.Unmarshal(b, v)
}
func newHandler(json bool, projectID, subscriptionName, credentialsFile string) (*handler, error) {
var opts []option.ClientOption
if credentialsFile != "" {
opts = append(opts, option.WithCredentialsFile(credentialsFile))
}
client, err := pubsub.NewClient(context.TODO(), projectID, opts...)
if err != nil {
return nil, err
}
subscription := client.Subscription(subscriptionName)
exists, err := subscription.Exists(context.TODO())
if err != nil {
return nil, err
}
if !exists {
return nil, errors.New("subscription does not exist")
}
return &handler{
json: json,
subscription: subscription,
client: client,
}, nil
}
func main() {
h, err := newHandler(
false, // set to true when using JSON encoding
"project-id", // replace with GCP project ID
"subscription-name", // replace with GCP Pub/Sub Subscription name
"", // path to GCP credentials file
)
if err != nil {
panic(err)
}
if err := h.receive(); err != nil {
panic(err)
}
}
Python
Please refer to the Setting up authentication section for creating a service-account and setting up the credentials.
main.py
:
from chirpstack_api import integration
from google.cloud import pubsub_v1
from google.protobuf.json_format import Parse
class Handler:
def __init__(self, json, project_id, subscription_name):
self.json = json
self.project_id = project_id
self.subscription_name = subscription_name
def receive(self):
subscriber = pubsub_v1.SubscriberClient()
sub_path = subscriber.subscription_path(self.project_id, self.subscription_name)
while True:
resp = subscriber.pull(sub_path, max_messages=10)
for msg in resp.received_messages:
event = msg.message.attributes['event']
subscriber.acknowledge(sub_path, [msg.ack_id])
if event == 'up':
self.up(msg.message.data)
elif event == 'join':
self.join(msg.message.data)
else:
print('handler for event %s is not implemented' % event)
def up(self, body):
up = self.unmarshal(body, integration.UplinkEvent())
print('Uplink received from: %s with payload: %s' % (up.dev_eui.hex(), up.data.hex()))
def join(self, body):
join = self.unmarshal(body, integration.JoinEvent())
print('Device: %s joined with DevAddr: %s' % (join.dev_eui.hex(), join.dev_addr.hex()))
def unmarshal(self, body, pl):
if self.json:
return Parse(body, pl)
pl.ParseFromString(body)
return pl
h = Handler(
# True - JSON marshaler
# False - Protobuf marshaler (binary)
False,
# GCP Project ID
"project-id",
# GCP Pub/Sub Subsciption name
"subscription-name",
)
h.receive()