From f28a39727a76b662fca093c5b7311247e2961e20 Mon Sep 17 00:00:00 2001 From: Thomas Schwery Date: Fri, 20 Apr 2018 09:43:56 +0200 Subject: [PATCH] Add test projects --- messaging-mqtt/emitter-counter-pub.go | 30 +++++++++++++++ messaging-mqtt/emitter-print-sub.go | 51 +++++++++++++++++++++++++ messaging-mqtt/paho-print-sub.go | 39 +++++++++++++++++++ messaging-mqtt/paho-sensorrandom-pub.go | 44 +++++++++++++++++++++ 4 files changed, 164 insertions(+) create mode 100644 messaging-mqtt/emitter-counter-pub.go create mode 100644 messaging-mqtt/emitter-print-sub.go create mode 100644 messaging-mqtt/paho-print-sub.go create mode 100644 messaging-mqtt/paho-sensorrandom-pub.go diff --git a/messaging-mqtt/emitter-counter-pub.go b/messaging-mqtt/emitter-counter-pub.go new file mode 100644 index 0000000..66442e0 --- /dev/null +++ b/messaging-mqtt/emitter-counter-pub.go @@ -0,0 +1,30 @@ +package main + +import ( + "fmt" + emitter "github.com/emitter-io/go" + "time" +) + +func main() { + // Create the options with default values + o := emitter.NewClientOptions() + o.AddBroker("tcp://localhost:1883") + o.SetClientID("go-emitter-publish") + + // Create a new emitter client and connect to the broker + c := emitter.NewClient(o) + sToken := c.Connect() + if sToken.Wait() && sToken.Error() != nil { + panic("Error on Client.Connect(): " + sToken.Error().Error()) + } + + for i := 0; i < 1000; i ++ { + text := fmt.Sprintf("Message %d", i) + token := c.Publish("abqGF3nbCeYIYzOckDrU1vOq6uuU16rb", "test/counter", text) + token.Wait() + time.Sleep(5 * time.Second) + } + + +} diff --git a/messaging-mqtt/emitter-print-sub.go b/messaging-mqtt/emitter-print-sub.go new file mode 100644 index 0000000..f17b3ce --- /dev/null +++ b/messaging-mqtt/emitter-print-sub.go @@ -0,0 +1,51 @@ +package main + +import ( + "fmt" + emitter "github.com/emitter-io/go" + "time" +) + +func main() { + // Create the options with default values + o := emitter.NewClientOptions() + o.AddBroker("tcp://localhost:1883") + o.SetClientID("go-emitter-subscribe") + + // Set the message handler + o.SetOnMessageHandler(func(client emitter.Emitter, msg emitter.Message) { + fmt.Printf("Received message from %s: %s\n", msg.Topic(), msg.Payload()) + }) + + o.SetOnConnectionLostHandler(func(client emitter.Emitter, err error) { + fmt.Printf("Lost connection to broker: %s\n", err.Error()) + for { + cToken := client.Connect() + if cToken.Wait() && cToken.Error() != nil { + time.Sleep(5 * time.Second) + } else { + setupSubscribe(client) + break + } + } + }) + + c := emitter.NewClient(o) + sToken := c.Connect() + if sToken.Wait() && sToken.Error() != nil { + panic("Error on Client.Connect(): " + sToken.Error().Error()) + } + + setupSubscribe(c) + + time.Sleep(600* time.Second) +} + +func setupSubscribe(client emitter.Emitter) { + sToken := client.Subscribe("CockozbDw5Bu3EKTJEfaJIBbj1OVGH5P", "test/") + + if sToken.Wait() && sToken.Error() != nil { + panic("Error on Subscribe: " + sToken.Error().Error()) + } +} + diff --git a/messaging-mqtt/paho-print-sub.go b/messaging-mqtt/paho-print-sub.go new file mode 100644 index 0000000..cf8ce93 --- /dev/null +++ b/messaging-mqtt/paho-print-sub.go @@ -0,0 +1,39 @@ +package main + +import ( + "fmt" + MQTT "github.com/eclipse/paho.mqtt.golang" + "os" + "time" +) + +var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) { + fmt.Printf("TOPIC: %s (%s)\n", msg.Topic(), msg.MessageID()) + fmt.Printf("MSG: %s\n", msg.Payload()) +} + +func main() { + opts := MQTT.NewClientOptions().AddBroker("tcp://localhost:1883") + opts.SetClientID("go-simple") + opts.SetCleanSession(false) + opts.SetDefaultPublishHandler(f) + + c := MQTT.NewClient(opts) + if token := c.Connect(); token.Wait() && token.Error() != nil { + panic(token.Error()) + } + + if token := c.Subscribe("test/#", 0, nil); token.Wait() && token.Error() != nil { + fmt.Println(token.Error()) + os.Exit(1) + } + + time.Sleep(300 * time.Second) + + if token := c.Unsubscribe("go-mqtt/sample"); token.Wait() && token.Error() != nil { + fmt.Println(token.Error()) + os.Exit(1) + } + + c.Disconnect(250) +} diff --git a/messaging-mqtt/paho-sensorrandom-pub.go b/messaging-mqtt/paho-sensorrandom-pub.go new file mode 100644 index 0000000..aa34117 --- /dev/null +++ b/messaging-mqtt/paho-sensorrandom-pub.go @@ -0,0 +1,44 @@ +package main + +import ( + MQTT "github.com/eclipse/paho.mqtt.golang" + "math/rand" + "encoding/json" + "time" +) + +type sensordata struct { + Temperature float32 + Humidity float32 + Battery float32 + Light float32 +} + +func main() { + opts := MQTT.NewClientOptions().AddBroker("tcp://localhost:1883") + opts.SetClientID("go-sipmle-publish") + + c := MQTT.NewClient(opts) + if token := c.Connect(); token.Wait() && token.Error() != nil { + panic(token.Error()) + } + + rand.Seed(time.Now().UTC().UnixNano()) + + for i := 0; i < 1000; i ++ { + d := sensordata{randfun(0, 20), randfun(10, 30), randfun(0, 100), randfun(0, 255)} + text, _ := json.Marshal(d) + token := c.Publish("abqGF3nbCeYIYzOckDrU1vOq6uuU16rb/test/counter2/", 2, false, text) + if token.Wait() && token.Error() != nil { + panic(token.Error()) + } + time.Sleep(5 * time.Second) + } +} + + +func randfun(min int, max int) float32 { + t := rand.Float32() * float32(max-min) + + return float32(min) + t +}