AWS SNS
The Simple Notification Service (SNS) integration publishes all the events to a SNS Topic to which other applications or AWS services can subscribe for further processing.
Events
The AWS Simple Notification Service integration exposes all events as documented by Event types.
Message attributes
The following message attributes are added to each published message:
event
- the event typedev_eui
- the device EUIapplication_id
- the ChirpStack Application Server application ID
Example code
The following code example demonstrates how to consume integration events using an AWS SQS subscription.
!!! important Make sure the Enable raw message delivery option is enabled on the subscription. If not enabled, the SQS messages will not have the expected attributes.
Go
main.go
:
package main
import (
"encoding/base64"
"encoding/hex"
"log"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"github.com/chirpstack/chirpstack/api/go/v4/integration"
)
type handler struct {
json bool
sqs *sqs.SQS
queueURL string
}
func (h *handler) receive() error {
for {
result, err := h.sqs.ReceiveMessage(&sqs.ReceiveMessageInput{
MessageAttributeNames: []*string{
aws.String(sqs.QueueAttributeNameAll),
},
QueueUrl: &h.queueURL,
MaxNumberOfMessages: aws.Int64(1),
})
if err != nil {
return err
}
for _, msg := range result.Messages {
_, err := h.sqs.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: &h.queueURL,
ReceiptHandle: msg.ReceiptHandle,
})
if err != nil {
log.Printf("delete message error: %s", err)
}
event, ok := msg.MessageAttributes["event"]
if !ok || event.StringValue == nil {
log.Printf("event attribute is missing")
continue
}
switch *event.StringValue {
case "up":
err = h.up(*msg.Body)
case "join":
err = h.join(*msg.Body)
default:
log.Printf("handler for event %s is not implemented", *event.StringValue)
err = nil
}
if err != nil {
log.Printf("handling event '%s' returned error: %s", *event.StringValue, err)
}
}
}
return nil
}
func (h *handler) up(body string) error {
var up integration.UplinkEvent
if err := h.unmarshal(body, &up); err != nil {
return err
}
log.Printf("Uplink received from %s with payload: %s", up.GetDeviceInfo().DevEui, hex.EncodeToString(up.Data))
return nil
}
func (h *handler) join(body string) error {
var join integration.JoinEvent
if err := h.unmarshal(body, &join); err != nil {
return err
}
log.Printf("Device %s joined with DevAddr %s", join.GetDeviceInfo().DevEui, join.DevAddr)
return nil
}
func (h *handler) unmarshal(body string, v proto.Message) error {
if h.json {
return protojson.UnmarshalOptions{
DiscardUnknown: true,
AllowPartial: true,
}.Unmarshal([]byte(body), v)
}
b, err := base64.StdEncoding.DecodeString(body)
if err != nil {
return err
}
return proto.Unmarshal(b, v)
}
func newHandler(json bool, accessKeyID, secretAccessKey, region, queueURL string) (*handler, error) {
sess, err := session.NewSession(&aws.Config{
Region: aws.String(region),
Credentials: credentials.NewStaticCredentials(accessKeyID, secretAccessKey, ""),
})
if err != nil {
return nil, err
}
return &handler{
json: json,
sqs: sqs.New(sess),
queueURL: queueURL,
}, nil
}
func main() {
h, err := newHandler(
// set true when using JSON encoding
false,
// AWS AccessKeyID
"...",
// AWS SecretAccessKey
"...",
// AWS region
"eu-west-1",
// SQS queue url
"https://sqs...",
)
if err != nil {
panic(err)
}
panic(h.receive())
}
Python
Please refer to the Boto3 Configuration for setting up the API credentials.
main.py
:
import boto3
from chirpstack_api import integration
from google.protobuf.json_format import Parse
class Handler:
def __init__(self, json, queue_url):
self.json = json
self.queue_url = queue_url
def receive(self):
sqs = boto3.client('sqs')
while True:
resp = sqs.receive_message(
QueueUrl=self.queue_url,
MessageAttributeNames=[
'All',
],
MaxNumberOfMessages=1,
WaitTimeSeconds=10,
)
if not 'Messages' in resp:
continue
msg = resp['Messages'][0]
receipt_handle = msg['ReceiptHandle']
sqs.delete_message(
QueueUrl=self.queue_url,
ReceiptHandle=receipt_handle,
)
event = msg['MessageAttributes']['event']['StringValue']
if event == "up":
self.up(msg['Body'])
elif event == "join":
self.join(msg['Body'])
else:
print('handler for event %s is not implemented' % event)
def up(self, body):
up = self.unmarshal(body, integration.UplinkEvent())
print('Uplink received from: %s with payload: %s' % (up.device_info.dev_eui, up.data.hex()))
def join(self, body):
join = self.unmarshal(body, integration.JoinEvent())
print('Device: %s joined with DevAddr: %s' % (join.device_info.dev_eui, join.dev_addr))
def unmarshal(self, body, pl):
if self.json:
return Parse(body, pl)
pl.ParseFromString(body)
return pl
h = Handler(
# True - JSON marshaler
# False - Protobuf marshaler (binary)
False,
# SQS queue url
'https://sqs....',
)
h.receive()