Azure Service-Bus
The Azure Service Bus integration publishes all the events to a Service Bus Topic or Queue to which applications can subscribe.
Events
The Azure Service Bus integration exposes all events as documented by Event types.
User properties
The following user properties are added to each published message:
event
- the event typedev_eui
- the device EUIapplication_id
- the ChirpStack Application Server application ID
Example code
The following code example demonstrates how to consume integration events using an Azure Service-Bus Queue.
Go
main.go
:
package main
import (
"context"
"encoding/hex"
"log"
servicebus "github.com/Azure/azure-service-bus-go"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"github.com/chirpstack/chirpstack/api/go/v4/integration"
)
type handler struct {
json bool
ns *servicebus.Namespace
queue *servicebus.Queue
}
func (h *handler) receive() error {
for {
if err := h.queue.Receive(context.TODO(), servicebus.HandlerFunc(h.receiveFunc)); err != nil {
return err
}
}
}
func (h *handler) receiveFunc(ctx context.Context, msg *servicebus.Message) error {
ev, ok := msg.UserProperties["Event"]
if !ok {
log.Println("event attribute is missing")
}
event, ok := ev.(string)
if !ok {
log.Println("event must be of type string")
}
var err error
switch event {
case "up":
err = h.up(msg.Data)
case "join":
err = h.join(msg.Data)
default:
log.Printf("handler for event %s is not implemented", event)
}
if err != nil {
log.Printf("handling event '%s' returned error: %s", event, err)
}
return msg.Complete(ctx)
}
func (h *handler) up(b []byte) error {
var up integration.UplinkEvent
if err := h.unmarshal(b, &up); err != nil {
return err
}
log.Printf("Uplink received from %s with payload: %s", up.GetDeviceInfo().DevEui, hex.EncodeToString(up.Data))
return nil
}
func (h *handler) join(b []byte) error {
var join integration.JoinEvent
if err := h.unmarshal(b, &join); err != nil {
return err
}
log.Printf("Device %s joined with DevAddr %s", join.GetDeviceInfo().DevEui, join.DevAddr)
return nil
}
func (h *handler) unmarshal(b []byte, v proto.Message) error {
if h.json {
return protojson.UnmarshalOptions{
DiscardUnknown: true,
AllowPartial: true,
}.Unmarshal(b, v)
}
return proto.Unmarshal(b, v)
}
func newHandler(json bool, connStr string, queueName string) (*handler, error) {
ns, err := servicebus.NewNamespace(
servicebus.NamespaceWithConnectionString(connStr),
)
if err != nil {
panic(err)
}
queue, err := ns.NewQueue(queueName)
if err != nil {
panic(err)
}
return &handler{
json: json,
ns: ns,
queue: queue,
}, nil
}
func main() {
h, err := newHandler(
// set true when using JSON encoding
false,
// service-bus connection string
"Endpoint=sb://example.servicebus.windows.net/;SharedAccessKeyName=example-policy;SharedAccessKey=...",
// queue name
"events",
)
if err != nil {
panic(err)
}
panic(h.receive())
}
Python
main.py
:
from azure.servicebus import ServiceBusClient
from chirpstack_api import integration
from google.protobuf.json_format import Parse
class Handler:
def __init__(self, json, connection_string, queue_name):
self.json = json
self.connection_string = connection_string
self.queue_name = queue_name
def receive(self):
client = ServiceBusClient.from_connection_string(self.connection_string)
queue_client = client.get_queue(self.queue_name)
messages = queue_client.get_receiver()
for message in messages:
message.complete()
body = b''.join(message.body)
event = message.user_properties[b'Event']
if event == b'up':
self.up(body)
elif event == b'join':
self.join(body)
else:
print('handler for event %s is not implemented' % event)
def up(self, body):
up = self.unmarshal(body, integration.UplinkEvent())
print('Uplink received from: %s with payload: %s' % (up.device_info.dev_eui, up.data.hex()))
def join(self, body):
join = self.unmarshal(body, integration.JoinEvent())
print('Device: %s joined with DevAddr: %s' % (join.device_info.dev_eui.hex, join.dev_addr))
def unmarshal(self, body, pl):
if self.json:
return Parse(body, pl)
pl.ParseFromString(body)
return pl
h = Handler(
# True - JSON marshaler
# False - Protobuf marshaler (binary)
False,
# Service-Bus connection string
'Endpoint=sb://example.servicebus.windows.net/;SharedAccessKeyName=example-policy;SharedAccessKey=...',
# Service-Bus queue name
'events',
)
h.receive()