package main import ( "encoding/json" "fmt" "io/ioutil" "log" "os" "os/exec" "strconv" "strings" "time" "github.com/frickelblog/message" "github.com/frickelblog/surgemq/service" "github.com/stianeikeland/go-rpio" ) var ( config = readConfig("./config.json") MQTTClient service.Client srv *service.Server client *service.Client ) type Configuration struct { Debug bool `json:"debug"` EnableFileSystem bool `json:"enablefilesystem"` EnableMQTTServer bool `json:"enablemqttserver"` EnableMQTTClient bool `json:"enablemqttclient"` MQTTServerHost string `json:"mqttserverhost"` MQTTServerTCPPort string `json:"mqttservertcpport"` MQTTServerWSPort string `json:"mqttserverwsport"` MQTTClientAddress string `json:"mqttclientaddress"` MQTTClientPort string `json:"mqttclientport"` MQTTClientUser string `json:"mqttclientuser"` MQTTClientPass string `json:"mqttclientpass"` Pins []PinConfiguration `json:"pins"` } type PinConfiguration struct { Pin int8 `json:"pin"` IO string `json:"io"` File string `json:"file"` MQTTTopic string `json:"mqtttopic"` State string `json:"state"` } // our main function func main() { // -------------------------------------------------------------------------------- // Create a new mqtt server if config.EnableMQTTServer { srv = &service.Server{ KeepAlive: 300, // seconds ConnectTimeout: 2, // seconds SessionsProvider: "mem", // keeps sessions in memory Authenticator: "mockSuccess", // always succeed TopicsProvider: "mem", // keeps topic subscriptions in memory } // Websocket // "tcp://127.0.0.1:1883" => TCP COnnection vom MQTT Server als WS-Backend addr := "tcp://" + config.MQTTServerHost + ":" + config.MQTTServerTCPPort AddWebsocketHandler("/mqtt", addr) go ListenAndServeWebsocket(":" + config.MQTTServerWSPort) // ":8081" => WS-Connection // MQTT-Server go srv.ListenAndServe(addr) } // -------------------------------------------------------------------------------- for i, s := range config.Pins { fmt.Println(i, s) } fmt.Println("--------") if config.EnableMQTTClient { fmt.Println("Warte auf MQTT Client...") time.Sleep(1000 * time.Millisecond) // 2 Sek auf Server warten MQTTClient = mqttConnect() go heartbeatProcess(&MQTTClient, 30) } /* icounter := 0 for { icounter = icounter + 1 time.Sleep(1000 * time.Millisecond) mqttPubMSG("/pin/17", string(icounter)) } */ err := rpio.Open() if err != nil { panic(fmt.Sprint("unable to open gpio", err.Error())) } defer rpio.Close() //------------------------------------------------------------ // Pin Configuration for _, pinConfig := range config.Pins { pin := rpio.Pin(pinConfig.Pin) if pinConfig.IO == "in" { pin.Input() //pin.PullUp() //pin.PullDown() //pin.Detect(rpio.RiseEdge) // AnyEdge / FallEdge 1->0 / RiseEdge 0->1 } if pinConfig.IO == "out" { pin.Output() // Default PinState aus config setzen if pinConfig.State == "1" { pin.High() } else { pin.Low() } } } //------------------------------------------------------------ mqttStreamOutput := "" for { for i, pinConfig := range config.Pins { //fmt.Println(i, pinConfig) pin := rpio.Pin(pinConfig.Pin) // Edge-Detection test //if pin.EdgeDetected() { // newState := stringState(pin.Read()) // fmt.Println("Edge raised: " + newState) //} if pinConfig.IO == "in" { newState := stringState(pin.Read()) if newState != pinConfig.State { // Pinstate in Config speichern config.Pins[i].State = newState // Pinstate in Datei schreiben if config.EnableFileSystem { writePinFileState(pinConfig.File, newState) } // Pinstate in MQTT-Topic kippen if config.EnableMQTTClient { pubmsg := message.NewPublishMessage() pubmsg.SetTopic([]byte(pinConfig.MQTTTopic)) // /pin/17 pubmsg.SetPayload([]byte(newState)) MQTTClient.Publish(pubmsg, nil) } } } if pinConfig.IO == "out" { // PinState aus Datei lesen if config.EnableFileSystem { if readPinFileState(pinConfig.File) { pin.High() } else { pin.Low() } } // Pinstate in MQTT-Topic kippen, falls er durch Datei geƤndert wurde newState := stringState(pin.Read()) if newState != config.Pins[i].State { if config.EnableMQTTClient { pubmsg := message.NewPublishMessage() pubmsg.SetTopic([]byte(pinConfig.MQTTTopic)) // /pin/17 pubmsg.SetPayload([]byte(newState)) MQTTClient.Publish(pubmsg, nil) } } } // Pinstate Ausgabe auf der Konsole res := pin.Read() fmt.Print(pinConfig.Pin) fmt.Print(": ") fmt.Println(res) mqttStreamOutput = mqttStreamOutput + "" + strconv.Itoa(int(pinConfig.Pin)) + ": " + strconv.Itoa(int(res)) + "\r\n" } if config.EnableMQTTClient { pubmsg := message.NewPublishMessage() pubmsg.SetTopic([]byte("/pins")) // /pin/17 pubmsg.SetPayload([]byte(mqttStreamOutput)) MQTTClient.Publish(pubmsg, nil) } mqttStreamOutput = "" // 100ms warten time.Sleep(400 * time.Millisecond) cmd := exec.Command("clear") //Linux example, its tested cmd.Stdout = os.Stdout cmd.Run() } } //-------------------------------------------------------------------------- // Hilfsfunktionen //-------------------------------------------------------------------------- func fileExists(filename string) bool { info, err := os.Stat(filename) if os.IsNotExist(err) { return false } return !info.IsDir() } func stringState(state rpio.State) string { if state == Low { return "0" } if state == High { return "1" } return "" } func readPinFileState(filename string) bool { if fileExists(filename) { content, err := ioutil.ReadFile(filename) if err != nil { return false } text := strings.TrimSpace(string(content)) if strings.EqualFold(text, "1") { return true } } // Standardwert: false return false } func writePinFileState(filename string, content string) bool { if filename != "" { f, err := os.Create(filename) if err != nil { fmt.Println(err) return false } _, err = f.WriteString(content) if err != nil { fmt.Println(err) f.Close() return false } err = f.Close() if err != nil { fmt.Println(err) return false } } return true } func readConfig(filename string) *Configuration { // initialize conf with default values. conf := &Configuration{} b, err := ioutil.ReadFile(filename) if err != nil { return conf } if err = json.Unmarshal(b, conf); err != nil { return conf } return conf } // State of pin, High / Low const ( Low rpio.State = iota High ) func mqttConnect() service.Client { // Instantiates a new Client client = &service.Client{} // Creates a new MQTT CONNECT message and sets the proper parameters msg := message.NewConnectMessage() msg.SetWillQos(0) msg.SetVersion(4) msg.SetCleanSession(true) msg.SetClientId([]byte("gPIo")) msg.SetKeepAlive(20) //msg.SetWillTopic([]byte("will")) //msg.SetWillMessage([]byte("send me home")) //msg.SetUsername([]byte("gPIo")) //msg.SetPassword([]byte("")) // Connects to the remote server at 127.0.0.1 port 1883 //c.Connect("tcp://127.0.0.1:1883", msg) //c.Connect("tcp://test.mosquitto.org:1883", msg) if err := client.Connect("tcp://"+config.MQTTClientAddress+":"+config.MQTTClientPort, msg); err != nil { log.Fatal(err) } //time.Sleep(5000 * time.Millisecond) pubmsg := message.NewPublishMessage() pubmsg.SetTopic([]byte("/topic")) pubmsg.SetPayload([]byte("msg")) pubmsg.SetQoS(0) // Publishes to the server by sending the message err := client.Publish(pubmsg, nil) if err != nil { fmt.Print("Fehler: ") } // Subscribe MQTTTopics submsg := message.NewSubscribeMessage() for _, pincfg := range config.Pins { if pincfg.MQTTTopic != "" { fmt.Println(" - Subscribe MQTT-Topic: " + pincfg.MQTTTopic) submsg.AddTopic([]byte(pincfg.MQTTTopic), 0) } } client.Subscribe(submsg, nil, mqttOnPublishFunc) return *client } func mqttOnPublishFunc(msg *message.PublishMessage) error { topic := string(msg.Topic()) payload := string(msg.Payload()) fmt.Println(topic) fmt.Println(payload) for i, pinConfig := range config.Pins { if pinConfig.MQTTTopic == topic { pin := rpio.Pin(pinConfig.Pin) if payload == "1" { pin.High() } else if payload == "0" { pin.Low() } // Pinstate in Config speichern newState := stringState(pin.Read()) config.Pins[i].State = newState // Pinstate in Datei schreiben if config.EnableFileSystem { writePinFileState(pinConfig.File, newState) } } } return nil } func mqttPubMSG(topic string, msg string) { // Creates a new PUBLISH message with the appropriate contents for publishing pubmsg := message.NewPublishMessage() pubmsg.SetTopic([]byte(topic)) pubmsg.SetPayload([]byte(msg)) pubmsg.SetQoS(0) // Publishes to the server by sending the message err := MQTTClient.Publish(pubmsg, nil) if err != nil { fmt.Print("Fehler: ") fmt.Println(err) //MQTTClient.Unsubscribe() MQTTClient.Disconnect() MQTTClient = mqttConnect() MQTTClient.Publish(pubmsg, nil) } } func heartbeatProcess(c *service.Client, KeepAlive int) { for _ = range time.Tick(time.Duration(KeepAlive) * time.Second) { //fmt.Println("Ping OK") err := c.Ping(nil) if err != nil { fmt.Print("Fehler: ") fmt.Println(err) } } }