commit f058b94f1fa785566d9b5130b2ae5f68e338137f Author: Sven Date: Sat Dec 7 22:59:56 2019 +0100 Initial Commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..50b033c --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +out/ +test.exe diff --git a/README.md b/README.md new file mode 100644 index 0000000..6396081 --- /dev/null +++ b/README.md @@ -0,0 +1,40 @@ +# Go gPIo app + +Binarys befinden sich unter https://nc.masilux.de/index.php/s/bEzP2msezHPzyee + +Zum Kompilieren folgende Go-Abhängigkeiten installieren: + - `go get github.com/gorilla/mux` + - `go get github.com/stianeikeland/go-rpio` + - `go get github.com/frickelblog/surgemq/service` + - `go get github.com/frickelblog/message` + +Kompilieren mit: `go build main.go -o gPIo.bin` +Kompilieren unter Windows (Powesrhell) mit Zielsystem Linux/ARM: `$env:GOOS="linux"; $env:GOARCH="arm"; go build -o gPIo.arm` + + +**Konfiguration** +Die Konfiguration erfolgt in der Datei `config.json`, z.B: +``` +{ + "debug": true, + "enablefilesystem": true, + "enablemqttserver": true, + "enablemqttclient": true, + "mqttserverhost": "127.0.0.1", + "mqttservertcpport": "1883", + "mqttserverwsport": "8080", + "mqttclientaddress": "127.0.0.1", + "mqttclientport": "1883", + "mqttclientuser": "", + "mqttclientpass": "", + "pins": [ + { "pin":17, "io":"in", "state":"0", "file":"", "mqtttopic":"/pin/17" }, + { "pin":20, "io":"out", "state":"1", "file":"", "mqtttopic":"/pin/20" }, + { "pin":21, "io":"in", "state":"0", "file":"", "mqtttopic":"/pin/21" }, + { "pin":22, "io":"in", "state":"0", "file":"","mqtttopic":"/pin/22" } + ] +} +``` + +**Anwendung** + diff --git a/config.json b/config.json new file mode 100644 index 0000000..7d52368 --- /dev/null +++ b/config.json @@ -0,0 +1,19 @@ +{ + "debug": true, + "enablefilesystem": true, + "enablemqttserver": true, + "enablemqttclient": true, + "mqttserverhost": "127.0.0.1", + "mqttservertcpport": "1883", + "mqttserverwsport": "8080", + "mqttclientaddress": "127.0.0.1", + "mqttclientport": "1883", + "mqttclientuser": "", + "mqttclientpass": "", + "pins": [ + { "pin":17, "io":"in", "state":"0", "file":"", "mqtttopic":"/pin/17" }, + { "pin":20, "io":"out", "state":"1", "file":"", "mqtttopic":"/pin/20" }, + { "pin":21, "io":"in", "state":"0", "file":"", "mqtttopic":"/pin/21" }, + { "pin":22, "io":"in", "state":"0", "file":"","mqtttopic":"/pin/22" } + ] +} \ No newline at end of file diff --git a/gpio-numbers-pi2.png b/gpio-numbers-pi2.png new file mode 100644 index 0000000..c50ea4f Binary files /dev/null and b/gpio-numbers-pi2.png differ diff --git a/main.go b/main.go new file mode 100644 index 0000000..e9cbb10 --- /dev/null +++ b/main.go @@ -0,0 +1,409 @@ +package main + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "log" + "os" + "os/exec" + "strconv" + "strings" + "time" + + "github.com/frickelblog/message" + "github.com/frickelblog/surgemq/service" + "github.com/stianeikeland/go-rpio" +) + +var ( + config = readConfig("./config.json") + MQTTClient service.Client + srv *service.Server + client *service.Client +) + +type Configuration struct { + Debug bool `json:"debug"` + EnableFileSystem bool `json:"enablefilesystem"` + EnableMQTTServer bool `json:"enablemqttserver"` + EnableMQTTClient bool `json:"enablemqttclient"` + MQTTServerHost string `json:"mqttserverhost"` + MQTTServerTCPPort string `json:"mqttservertcpport"` + MQTTServerWSPort string `json:"mqttserverwsport"` + MQTTClientAddress string `json:"mqttclientaddress"` + MQTTClientPort string `json:"mqttclientport"` + MQTTClientUser string `json:"mqttclientuser"` + MQTTClientPass string `json:"mqttclientpass"` + Pins []PinConfiguration `json:"pins"` +} + +type PinConfiguration struct { + Pin int8 `json:"pin"` + IO string `json:"io"` + File string `json:"file"` + MQTTTopic string `json:"mqtttopic"` + State string `json:"state"` +} + +// our main function +func main() { + + // -------------------------------------------------------------------------------- + // Create a new mqtt server + if config.EnableMQTTServer { + srv = &service.Server{ + KeepAlive: 300, // seconds + ConnectTimeout: 2, // seconds + SessionsProvider: "mem", // keeps sessions in memory + Authenticator: "mockSuccess", // always succeed + TopicsProvider: "mem", // keeps topic subscriptions in memory + } + + // Websocket // "tcp://127.0.0.1:1883" => TCP COnnection vom MQTT Server als WS-Backend + addr := "tcp://" + config.MQTTServerHost + ":" + config.MQTTServerTCPPort + AddWebsocketHandler("/mqtt", addr) + go ListenAndServeWebsocket(":" + config.MQTTServerWSPort) // ":8081" => WS-Connection + // MQTT-Server + go srv.ListenAndServe(addr) + } + // -------------------------------------------------------------------------------- + + for i, s := range config.Pins { + fmt.Println(i, s) + } + + fmt.Println("--------") + + if config.EnableMQTTClient { + fmt.Println("Warte auf MQTT Client...") + time.Sleep(1000 * time.Millisecond) // 2 Sek auf Server warten + + MQTTClient = mqttConnect() + go heartbeatProcess(&MQTTClient, 30) + } + /* + icounter := 0 + for { + icounter = icounter + 1 + time.Sleep(1000 * time.Millisecond) + mqttPubMSG("/pin/17", string(icounter)) + } + */ + + err := rpio.Open() + if err != nil { + panic(fmt.Sprint("unable to open gpio", err.Error())) + } + + defer rpio.Close() + + //------------------------------------------------------------ + // Pin Configuration + for _, pinConfig := range config.Pins { + pin := rpio.Pin(pinConfig.Pin) + + if pinConfig.IO == "in" { + pin.Input() + //pin.PullUp() + //pin.PullDown() + //pin.Detect(rpio.RiseEdge) // AnyEdge / FallEdge 1->0 / RiseEdge 0->1 + } + + if pinConfig.IO == "out" { + pin.Output() + + // Default PinState aus config setzen + if pinConfig.State == "1" { + pin.High() + + } else { + pin.Low() + } + } + } + + //------------------------------------------------------------ + mqttStreamOutput := "" + + for { + + for i, pinConfig := range config.Pins { + //fmt.Println(i, pinConfig) + + pin := rpio.Pin(pinConfig.Pin) + + // Edge-Detection test + //if pin.EdgeDetected() { + // newState := stringState(pin.Read()) + // fmt.Println("Edge raised: " + newState) + //} + + if pinConfig.IO == "in" { + + newState := stringState(pin.Read()) + + if newState != pinConfig.State { + // Pinstate in Config speichern + config.Pins[i].State = newState + + // Pinstate in Datei schreiben + if config.EnableFileSystem { + writePinFileState(pinConfig.File, newState) + } + + // Pinstate in MQTT-Topic kippen + if config.EnableMQTTClient { + pubmsg := message.NewPublishMessage() + pubmsg.SetTopic([]byte(pinConfig.MQTTTopic)) // /pin/17 + pubmsg.SetPayload([]byte(newState)) + MQTTClient.Publish(pubmsg, nil) + } + } + + } + + if pinConfig.IO == "out" { + + // PinState aus Datei lesen + if config.EnableFileSystem { + if readPinFileState(pinConfig.File) { + pin.High() + + } else { + pin.Low() + } + } + + // Pinstate in MQTT-Topic kippen, falls er durch Datei geändert wurde + newState := stringState(pin.Read()) + if newState != config.Pins[i].State { + if config.EnableMQTTClient { + pubmsg := message.NewPublishMessage() + pubmsg.SetTopic([]byte(pinConfig.MQTTTopic)) // /pin/17 + pubmsg.SetPayload([]byte(newState)) + MQTTClient.Publish(pubmsg, nil) + } + } + } + + // Pinstate Ausgabe auf der Konsole + res := pin.Read() + fmt.Print(pinConfig.Pin) + fmt.Print(": ") + fmt.Println(res) + + mqttStreamOutput = mqttStreamOutput + "" + strconv.Itoa(int(pinConfig.Pin)) + ": " + strconv.Itoa(int(res)) + "\r\n" + } + + if config.EnableMQTTClient { + pubmsg := message.NewPublishMessage() + pubmsg.SetTopic([]byte("/pins")) // /pin/17 + pubmsg.SetPayload([]byte(mqttStreamOutput)) + MQTTClient.Publish(pubmsg, nil) + } + mqttStreamOutput = "" + + // 100ms warten + time.Sleep(400 * time.Millisecond) + cmd := exec.Command("clear") //Linux example, its tested + cmd.Stdout = os.Stdout + cmd.Run() + } + +} + +//-------------------------------------------------------------------------- +// Hilfsfunktionen +//-------------------------------------------------------------------------- + +func fileExists(filename string) bool { + info, err := os.Stat(filename) + if os.IsNotExist(err) { + return false + } + return !info.IsDir() +} + +func stringState(state rpio.State) string { + if state == Low { + return "0" + } + if state == High { + return "1" + } + + return "" +} + +func readPinFileState(filename string) bool { + if fileExists(filename) { + content, err := ioutil.ReadFile(filename) + if err != nil { + return false + } + + text := strings.TrimSpace(string(content)) + if strings.EqualFold(text, "1") { + return true + } + + } + + // Standardwert: false + return false +} + +func writePinFileState(filename string, content string) bool { + if filename != "" { + f, err := os.Create(filename) + if err != nil { + fmt.Println(err) + return false + } + _, err = f.WriteString(content) + if err != nil { + fmt.Println(err) + f.Close() + return false + } + + err = f.Close() + if err != nil { + fmt.Println(err) + return false + } + } + return true +} + +func readConfig(filename string) *Configuration { + // initialize conf with default values. + conf := &Configuration{} + + b, err := ioutil.ReadFile(filename) + if err != nil { + return conf + } + if err = json.Unmarshal(b, conf); err != nil { + return conf + } + return conf +} + +// State of pin, High / Low +const ( + Low rpio.State = iota + High +) + +func mqttConnect() service.Client { + // Instantiates a new Client + client = &service.Client{} + + // Creates a new MQTT CONNECT message and sets the proper parameters + msg := message.NewConnectMessage() + msg.SetWillQos(0) + msg.SetVersion(4) + msg.SetCleanSession(true) + msg.SetClientId([]byte("gPIo")) + msg.SetKeepAlive(20) + //msg.SetWillTopic([]byte("will")) + //msg.SetWillMessage([]byte("send me home")) + //msg.SetUsername([]byte("gPIo")) + //msg.SetPassword([]byte("")) + + // Connects to the remote server at 127.0.0.1 port 1883 + //c.Connect("tcp://127.0.0.1:1883", msg) + //c.Connect("tcp://test.mosquitto.org:1883", msg) + if err := client.Connect("tcp://"+config.MQTTClientAddress+":"+config.MQTTClientPort, msg); err != nil { + log.Fatal(err) + } + + //time.Sleep(5000 * time.Millisecond) + + pubmsg := message.NewPublishMessage() + pubmsg.SetTopic([]byte("/topic")) + pubmsg.SetPayload([]byte("msg")) + pubmsg.SetQoS(0) + + // Publishes to the server by sending the message + err := client.Publish(pubmsg, nil) + if err != nil { + fmt.Print("Fehler: ") + } + + // Subscribe MQTTTopics + submsg := message.NewSubscribeMessage() + + for _, pincfg := range config.Pins { + if pincfg.MQTTTopic != "" { + fmt.Println(" - Subscribe MQTT-Topic: " + pincfg.MQTTTopic) + submsg.AddTopic([]byte(pincfg.MQTTTopic), 0) + } + } + + client.Subscribe(submsg, nil, mqttOnPublishFunc) + + return *client +} + +func mqttOnPublishFunc(msg *message.PublishMessage) error { + topic := string(msg.Topic()) + payload := string(msg.Payload()) + + fmt.Println(topic) + fmt.Println(payload) + + for i, pinConfig := range config.Pins { + if pinConfig.MQTTTopic == topic { + pin := rpio.Pin(pinConfig.Pin) + if payload == "1" { + pin.High() + } else if payload == "0" { + pin.Low() + } + + // Pinstate in Config speichern + newState := stringState(pin.Read()) + config.Pins[i].State = newState + + // Pinstate in Datei schreiben + if config.EnableFileSystem { + writePinFileState(pinConfig.File, newState) + } + } + } + + return nil +} + +func mqttPubMSG(topic string, msg string) { + // Creates a new PUBLISH message with the appropriate contents for publishing + pubmsg := message.NewPublishMessage() + pubmsg.SetTopic([]byte(topic)) + pubmsg.SetPayload([]byte(msg)) + pubmsg.SetQoS(0) + + // Publishes to the server by sending the message + err := MQTTClient.Publish(pubmsg, nil) + if err != nil { + fmt.Print("Fehler: ") + fmt.Println(err) + //MQTTClient.Unsubscribe() + MQTTClient.Disconnect() + MQTTClient = mqttConnect() + MQTTClient.Publish(pubmsg, nil) + } +} + +func heartbeatProcess(c *service.Client, KeepAlive int) { + for _ = range time.Tick(time.Duration(KeepAlive) * time.Second) { + //fmt.Println("Ping OK") + err := c.Ping(nil) + if err != nil { + fmt.Print("Fehler: ") + fmt.Println(err) + } + } +} diff --git a/websocket.go b/websocket.go new file mode 100644 index 0000000..7d20e9e --- /dev/null +++ b/websocket.go @@ -0,0 +1,102 @@ +package main + +import ( + "io" + "net" + "net/http" + "net/url" + + "github.com/surge/glog" + "golang.org/x/net/websocket" +) + +func DefaultListenAndServeWebsocket() error { + if err := AddWebsocketHandler("/mqtt", "test.mosquitto.org:1883"); err != nil { + return err + } + return ListenAndServeWebsocket(":1234") +} + +func AddWebsocketHandler(urlPattern string, uri string) error { + glog.Debugf("AddWebsocketHandler urlPattern=%s, uri=%s", urlPattern, uri) + u, err := url.Parse(uri) + if err != nil { + glog.Errorf("surgemq/main: %v", err) + return err + } + + h := func(ws *websocket.Conn) { + WebsocketTcpProxy(ws, u.Scheme, u.Host) + } + http.Handle(urlPattern, websocket.Handler(h)) + return nil +} + +/* start a listener that proxies websocket <-> tcp */ +func ListenAndServeWebsocket(addr string) error { + return http.ListenAndServe(addr, nil) +} + +/* starts an HTTPS listener */ +func ListenAndServeWebsocketSecure(addr string, cert string, key string) error { + return http.ListenAndServeTLS(addr, cert, key, nil) +} + +/* copy from websocket to writer, this copies the binary frames as is */ +func io_copy_ws(src *websocket.Conn, dst io.Writer) (int, error) { + var buffer []byte + count := 0 + for { + err := websocket.Message.Receive(src, &buffer) + if err != nil { + return count, err + } + n := len(buffer) + count += n + i, err := dst.Write(buffer) + if err != nil || i < 1 { + return count, err + } + } + return count, nil +} + +/* copy from reader to websocket, this copies the binary frames as is */ +func io_ws_copy(src io.Reader, dst *websocket.Conn) (int, error) { + buffer := make([]byte, 2048) + count := 0 + for { + n, err := src.Read(buffer) + if err != nil || n < 1 { + return count, err + } + count += n + err = websocket.Message.Send(dst, buffer[0:n]) + if err != nil { + return count, err + } + } + return count, nil +} + +/* handler that proxies websocket <-> unix domain socket */ +func WebsocketTcpProxy(ws *websocket.Conn, nettype string, host string) error { + client, err := net.Dial(nettype, host) + if err != nil { + return err + } + defer client.Close() + defer ws.Close() + chDone := make(chan bool) + + go func() { + io_ws_copy(client, ws) + chDone <- true + }() + go func() { + io_copy_ws(ws, client) + chDone <- true + }() + <-chDone + return nil +}