1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

go: add WebRTC node-type

This commit is contained in:
Steffen Vogel 2022-03-14 15:32:49 -04:00
parent fa398a2cf7
commit 5542dfbba1
10 changed files with 850 additions and 7 deletions

View file

@ -0,0 +1,15 @@
nodes = {
example_node = {
type = "go.example"
value = 555
format = "json"
in = {
# signals ={
# count = 5,
# type = "float"
# }
}
}
}

View file

@ -0,0 +1,38 @@
nodes = {
loopback_node = {
type = "go.loopback"
value = 555
format = "opal.asyncip"
in = {
signals ={
count = 5,
type = "float"
}
}
}
signal = {
type = "signal"
signal = "counter"
rate = 1
values = 5
}
}
paths = (
{
in = "loopback_node"
hooks = (
{
type = "dump"
}
)
},
{
in = "signal",
out = "loopback_node"
}
)

View file

@ -0,0 +1,31 @@
nodes = {
webrtc_node = {
type = "webrtc",
format = "json"
# A unique session identifier which must be shared between two nodes
session = "my-session-name"
# Address to the websocket signaling server
server = "wss://ws-signal.villas.k8s.eonerc.rwth-aachen.de"
# Setting for Interactive Connectivity Establishment
ice = {
# List of STUN/TURN servers
servers = (
{
urls = [
"stun:stun.0l.de:3478",
"turn:turn.0l.de:3478?transport=udp",
"turn:turn.0l.de:3478?transport=tcp"
],
# Credentials for TURN servers
username = "villas"
password = "villas"
}
)
}
}
}

View file

@ -6,6 +6,7 @@ import (
"time"
"git.rwth-aachen.de/acs/public/villas/node/go/pkg"
"git.rwth-aachen.de/acs/public/villas/node/go/pkg/errors"
"git.rwth-aachen.de/acs/public/villas/node/go/pkg/nodes"
)
@ -14,6 +15,8 @@ type Node struct {
ticker time.Ticker
lastSequence uint64
Config Config
}
@ -25,8 +28,9 @@ type Config struct {
func NewNode() nodes.Node {
return &Node{
BaseNode: nodes.NewBaseNode(),
ticker: *time.NewTicker(1 * time.Second),
BaseNode: nodes.NewBaseNode(),
ticker: *time.NewTicker(1 * time.Second),
lastSequence: 0,
}
}
@ -54,10 +58,16 @@ func (n *Node) Start() error {
func (n *Node) Read() ([]byte, error) {
select {
case <-n.ticker.C:
return pkg.GenerateRandomSample().Bytes(), nil
case <-n.Stopped:
return nil, nil
return nil, errors.ErrEndOfFile
case <-n.ticker.C:
n.lastSequence++
smp := pkg.GenerateRandomSample()
smp.Sequence = n.lastSequence
smps := []pkg.Sample{smp}
return json.Marshal(smps)
}
}

View file

@ -47,10 +47,11 @@ func (n *Node) Start() error {
func (n *Node) Read() ([]byte, error) {
select {
case buf := <-n.channel:
return buf, nil
case <-n.Stopped:
return nil, errors.ErrEndOfFile
case buf := <-n.channel:
return buf, nil
}
}

View file

@ -0,0 +1,33 @@
package webrtc
import "time"
var (
DefaultExponentialBackoff = ExponentialBackoff{
Factor: 1.5,
Maximum: 1 * time.Minute,
Initial: 1 * time.Second,
Duration: 1 * time.Second,
}
)
type ExponentialBackoff struct {
Factor float32
Maximum time.Duration
Initial time.Duration
Duration time.Duration
}
func (e *ExponentialBackoff) Next() time.Duration {
e.Duration = time.Duration(1.5 * float32(e.Duration)).Round(time.Second)
if e.Duration > e.Maximum {
e.Duration = e.Maximum
}
return e.Duration
}
func (e *ExponentialBackoff) Reset() {
e.Duration = e.Initial
}

View file

@ -0,0 +1,167 @@
package webrtc
import (
"fmt"
"net/url"
"time"
"git.rwth-aachen.de/acs/public/villas/node/go/pkg/nodes"
"github.com/gorilla/websocket"
)
type SignalingClient struct {
*websocket.Conn
logger nodes.Logger
URL *url.URL
done chan struct{}
isClosing bool
backoff ExponentialBackoff
messageCallbacks []func(msg *SignalingMessage)
connectCallbacks []func()
disconnectCallbacks []func()
}
func NewSignalingClient(u *url.URL, logger nodes.Logger) (*SignalingClient, error) {
c := &SignalingClient{
messageCallbacks: []func(msg *SignalingMessage){},
connectCallbacks: []func(){},
disconnectCallbacks: []func(){},
isClosing: false,
backoff: DefaultExponentialBackoff,
URL: u,
logger: logger,
}
return c, nil
}
func (c *SignalingClient) OnConnect(cb func()) {
c.connectCallbacks = append(c.connectCallbacks, cb)
}
func (c *SignalingClient) OnDisconnect(cb func()) {
c.disconnectCallbacks = append(c.connectCallbacks, cb)
}
func (c *SignalingClient) OnMessage(cb func(msg *SignalingMessage)) {
c.messageCallbacks = append(c.messageCallbacks, cb)
}
func (c *SignalingClient) SendSignalingMessage(msg *SignalingMessage) error {
c.logger.Infof("Sending signaling message: %s", msg)
return c.Conn.WriteJSON(msg)
}
func (c *SignalingClient) Close() error {
// Return immediatly if there is no open connection
if c.Conn == nil {
return nil
}
c.isClosing = true
// Cleanly close the connection by sending a close message and then
// waiting (with timeout) for the server to close the connection.
err := c.Conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add(5*time.Second))
if err != nil {
return fmt.Errorf("failed to send close message: %s", err)
}
select {
case <-c.done:
c.logger.Infof("Connection closed")
case <-time.After(3 * time.Second):
c.logger.Warn("Timed-out waiting for connection close")
}
return nil
}
func (c *SignalingClient) Connect() error {
var err error
dialer := websocket.Dialer{
HandshakeTimeout: 1 * time.Second,
}
c.Conn, _, err = dialer.Dial(c.URL.String(), nil)
if err != nil {
return fmt.Errorf("failed to dial %s: %w", c.URL, err)
}
for _, cb := range c.connectCallbacks {
cb()
}
go c.read()
// Reset reconnect timer
c.backoff.Reset()
c.done = make(chan struct{})
return nil
}
func (c *SignalingClient) ConnectWithBackoff() error {
t := time.NewTimer(c.backoff.Duration)
for range t.C {
if err := c.Connect(); err != nil {
t.Reset(c.backoff.Next())
c.logger.Errorf("Failed to connect: %s. Reconnecting in %s", err, c.backoff.Duration)
} else {
break
}
}
return nil
}
func (c *SignalingClient) read() {
for {
msg := &SignalingMessage{}
if err := c.Conn.ReadJSON(msg); err != nil {
if websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
} else {
c.logger.Errorf("Failed to read: %s", err)
}
break
}
c.logger.Infof("Received signaling message: %s", msg)
for _, cb := range c.messageCallbacks {
cb(msg)
}
}
c.closed()
}
func (c *SignalingClient) closed() {
if err := c.Conn.Close(); err != nil {
c.logger.Errorf("Failed to close connection: %s", err)
}
c.Conn = nil
for _, cb := range c.disconnectCallbacks {
cb()
}
close(c.done)
if c.isClosing {
c.logger.Infof("Connection closed")
} else {
c.logger.Warnf("Connection lost. Reconnecting in %s", c.backoff.Duration)
go c.ConnectWithBackoff()
}
}

181
go/pkg/nodes/webrtc/node.go Normal file
View file

@ -0,0 +1,181 @@
package webrtc
import (
"encoding/json"
"errors"
"fmt"
"net/url"
"strings"
verrors "git.rwth-aachen.de/acs/public/villas/node/go/pkg/errors"
"git.rwth-aachen.de/acs/public/villas/node/go/pkg/nodes"
"github.com/pion/webrtc/v3"
)
var (
DefaultConfig = Config{
Server: &url.URL{
Scheme: "wss",
Host: "wss://ws-signal.villas.k8s.eonerc.rwth-aachen.de",
},
WebRTC: webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{
// URLs: []string{"stun:stun.l.google.com:19302"},
URLs: []string{
"stun:edgy.0l.de:3478",
"turn:edgy.0l.de:3478?transport=udp",
"turn:edgy.0l.de:3478?transport=tcp",
},
CredentialType: webrtc.ICECredentialTypePassword,
Username: "villas",
Credential: "villas",
},
},
},
}
)
type Node struct {
nodes.BaseNode
*PeerConnection
Config Config
}
type Config struct {
Server *url.URL
Session string
WebRTC webrtc.Configuration
}
func NewNode() nodes.Node {
return &Node{
Config: DefaultConfig,
}
}
func (n *Node) Parse(c []byte) error {
var err error
var cfg struct {
Session *string `json:"session"`
Server *string `json:"server,omitempty"`
Ice *struct {
Servers []struct {
URLs []string `json:"urls,omitempty"`
Username *string `json:"username,omitempty"`
Password *string `json:"password,omitempty"`
} `json:"servers,omitempty"`
} `json:"ice,omitempty"`
}
if err := json.Unmarshal(c, &cfg); err != nil {
return fmt.Errorf("failed to unmarshal config: %w", err)
}
if cfg.Session == nil || *cfg.Session == "" {
return errors.New("missing or invalid session name")
} else {
n.Config.Session = *cfg.Session
}
if cfg.Server != nil {
n.Config.Server, err = url.Parse(*cfg.Server)
if err != nil {
return fmt.Errorf("failed to parse server address: %w", err)
}
}
if cfg.Ice != nil {
for _, server := range cfg.Ice.Servers {
iceServer := webrtc.ICEServer{
URLs: server.URLs,
}
if server.Username != nil && server.Password != nil {
iceServer.Username = *server.Username
iceServer.CredentialType = webrtc.ICECredentialTypePassword
iceServer.Credential = *server.Password
}
n.Config.WebRTC.ICEServers = append(n.Config.WebRTC.ICEServers, iceServer)
}
}
return nil
}
func (n *Node) Prepare() error {
var err error
n.PeerConnection, err = NewPeerConnection(&n.Config, n.Logger)
if err != nil {
return fmt.Errorf("failed to create peer connection: %w", err)
}
return nil
}
func (n *Node) Start() error {
n.DataChannelLock.Lock()
defer n.DataChannelLock.Unlock()
n.Logger.Info("Waiting until datachannel is connected...")
for n.DataChannel == nil {
n.DataChannelConnected.Wait()
}
return n.BaseNode.Start()
}
func (n *Node) Read() ([]byte, error) {
select {
case <-n.Stopped:
return nil, verrors.ErrEndOfFile
case b := <-n.PeerConnection.ReceivedMessages:
return b, nil
}
}
func (n *Node) Write(data []byte) error {
n.DataChannelLock.Lock()
defer n.DataChannelLock.Unlock()
if n.DataChannel == nil {
n.logger.Infof("No datachannel open. Skipping sample...")
return nil
}
return n.DataChannel.Send(data)
}
func (n *Node) PollFDs() ([]int, error) {
return []int{}, nil
}
func (n *Node) NetemFDs() ([]int, error) {
return []int{}, nil
}
func (n *Node) Details() string {
details := map[string]string{
"server": n.Config.Server.String(),
"session": n.Config.Session,
}
kv := []string{}
for k, v := range details {
kv = append(kv, fmt.Sprintf("%s=%s", k, v))
}
return strings.Join(kv, ", ")
}
func (n *Node) Close() error {
return nil
}
func init() {
nodes.RegisterNodeType("webrtc", "Web Real-time Communication", NewNode, nodes.NodeSupportsRead|nodes.NodeSupportsWrite)
}

View file

@ -0,0 +1,37 @@
package webrtc
import (
"encoding/json"
"time"
"github.com/pion/webrtc/v3"
)
type Connection struct {
ID int `json:"id"`
Remote string `json:"remote"`
UserAgent string `json:"user_agent"`
Created time.Time `json:"created"`
}
type ControlMessage struct {
ConnectionID int `json:"connection_id"`
Connections []Connection `json:"connections"`
}
type Role struct {
Polite bool `json:"polite"`
First bool `json:"first"`
}
type SignalingMessage struct {
Description *webrtc.SessionDescription `json:"description,omitempty"`
Candidate *webrtc.ICECandidateInit `json:"candidate,omitempty"`
Control *ControlMessage `json:"control,omitempty"`
}
func (msg SignalingMessage) String() string {
b, _ := json.Marshal(msg)
return string(b)
}

View file

@ -0,0 +1,330 @@
package webrtc
import (
"fmt"
"sync"
"git.rwth-aachen.de/acs/public/villas/node/go/pkg/nodes"
"github.com/pion/webrtc/v3"
)
type PeerConnection struct {
*webrtc.PeerConnection
*SignalingClient
config *Config
logger nodes.Logger
makingOffer bool
ignoreOffer bool
first bool
polite bool
rollback bool
DataChannel *webrtc.DataChannel
DataChannelLock sync.Mutex
DataChannelConnected *sync.Cond
ReceivedMessages chan []byte
}
func NewPeerConnection(config *Config, logger nodes.Logger) (*PeerConnection, error) {
u := *config.Server
u.Path = "/" + config.Session
sc, err := NewSignalingClient(&u, logger)
if err != nil {
return nil, fmt.Errorf("failed to create signaling client: %w", err)
}
ppc := &PeerConnection{
SignalingClient: sc,
config: config,
logger: logger,
DataChannel: nil,
ReceivedMessages: make(chan []byte, 1024),
}
ppc.DataChannelConnected = sync.NewCond(&ppc.DataChannelLock)
ppc.SignalingClient.OnMessage(ppc.OnSignalingMessageHandler)
ppc.SignalingClient.OnConnect(ppc.OnSignalingConnectedHandler)
if err := ppc.SignalingClient.ConnectWithBackoff(); err != nil {
return nil, fmt.Errorf("failed to connect signaling client: %w", err)
}
return ppc, nil
}
func (pc *PeerConnection) createPeerConnection() (*webrtc.PeerConnection, error) {
pc.logger.Info("Created new peer connection")
// Create a new RTCPeerConnection
ppc, err := webrtc.NewPeerConnection(pc.config.WebRTC)
if err != nil {
return nil, fmt.Errorf("failed to create peer connection: %w", err)
}
// Set the handler for ICE connection state
// This will notify you when the peer has connected/disconnected
ppc.OnConnectionStateChange(pc.OnConnectionStateChangeHandler)
ppc.OnSignalingStateChange(pc.OnSignalingStateChangeHandler)
ppc.OnICECandidate(pc.OnICECandidateHandler)
ppc.OnNegotiationNeeded(pc.OnNegotiationNeededHandler)
ppc.OnDataChannel(pc.OnDataChannelHandler)
return ppc, nil
}
func (pc *PeerConnection) createDataChannel() (*webrtc.DataChannel, error) {
dc, err := pc.CreateDataChannel("villas", nil)
if err != nil {
return nil, fmt.Errorf("failed to create datachannel: %w", err)
}
close := make(chan struct{})
dc.OnClose(func() { pc.OnDataChannelCloseHandler(dc, close) })
dc.OnOpen(func() { pc.OnDataChannelOpenHandler(dc, close) })
dc.OnMessage(func(msg webrtc.DataChannelMessage) { pc.OnDataChannelMessageHandler(dc, &msg, close) })
return dc, nil
}
func (pc *PeerConnection) rollbackPeerConnection() (*webrtc.PeerConnection, error) {
pc.rollback = true
defer func() { pc.rollback = false }()
// Close previous peer connection in before creating a new one
// We need to do this as pion/webrtc currently does not support rollbacks
if err := pc.PeerConnection.Close(); err != nil {
return nil, fmt.Errorf("failed to close peer connection: %w", err)
}
if ppc, err := pc.createPeerConnection(); err != nil {
return nil, err
} else {
return ppc, nil
}
}
func (pc *PeerConnection) OnConnectionCreated() error {
if !pc.first {
if _, err := pc.createDataChannel(); err != nil {
return err
}
}
return nil
}
func (pc *PeerConnection) OnDataChannelOpenHandler(dc *webrtc.DataChannel, close chan struct{}) {
pc.logger.Infof("DataChannel opened: %s", dc.Label())
pc.DataChannelLock.Lock()
defer pc.DataChannelLock.Unlock()
pc.DataChannel = dc
pc.DataChannelConnected.Broadcast()
}
func (pc *PeerConnection) OnDataChannelCloseHandler(dc *webrtc.DataChannel, cl chan struct{}) {
pc.logger.Infof("DataChannel closed: %s", dc.Label())
pc.DataChannel = nil
close(cl)
// We close the connection here to avoid waiting for the disconnected event
if err := pc.PeerConnection.Close(); err != nil {
pc.logger.Errorf("Failed to close peer connection: %s", err)
}
}
func (pc *PeerConnection) OnDataChannelMessageHandler(dc *webrtc.DataChannel, msg *webrtc.DataChannelMessage, close chan struct{}) {
pc.logger.Debugf("Received: %s", string(msg.Data))
pc.ReceivedMessages <- msg.Data
}
func (pc *PeerConnection) OnDataChannelHandler(dc *webrtc.DataChannel) {
pc.logger.Infof("New DataChannel opened: %s", dc.Label())
close := make(chan struct{})
dc.OnOpen(func() { pc.OnDataChannelOpenHandler(dc, close) })
dc.OnClose(func() { pc.OnDataChannelCloseHandler(dc, close) })
dc.OnMessage(func(msg webrtc.DataChannelMessage) { pc.OnDataChannelMessageHandler(dc, &msg, close) })
}
func (pc *PeerConnection) OnICECandidateHandler(c *webrtc.ICECandidate) {
if c == nil {
pc.logger.Info("Candidate gathering concluded")
return
}
pc.logger.Infof("Found new candidate: %s", c)
ci := c.ToJSON()
if err := pc.SendSignalingMessage(&SignalingMessage{
Candidate: &ci,
}); err != nil {
pc.logger.Errorf("Failed to send candidate: %s", err)
}
}
func (pc *PeerConnection) OnNegotiationNeededHandler() {
pc.logger.Info("Negotation needed!")
pc.makingOffer = true
offer, err := pc.CreateOffer(nil)
if err != nil {
pc.logger.Panicf("Failed to create offer: %s", err)
}
if err := pc.SetLocalDescription(offer); err != nil {
pc.logger.Panicf("Failed to set local description: %s", err)
}
if err := pc.SendSignalingMessage(&SignalingMessage{
Description: &offer,
}); err != nil {
pc.logger.Panicf("Failed to send offer: %s", err)
}
pc.makingOffer = false
}
func (pc *PeerConnection) OnSignalingStateChangeHandler(ss webrtc.SignalingState) {
pc.logger.Infof("Signaling State has changed: %s", ss.String())
}
func (pc *PeerConnection) OnConnectionStateChangeHandler(pcs webrtc.PeerConnectionState) {
pc.logger.Infof("Connection State has changed: %s", pcs.String())
switch pcs {
case webrtc.PeerConnectionStateFailed:
fallthrough
case webrtc.PeerConnectionStateDisconnected:
pc.logger.Info("Closing peer connection")
if err := pc.PeerConnection.Close(); err != nil {
pc.logger.Panicf("Failed to close peer connection: %s", err)
}
case webrtc.PeerConnectionStateClosed:
if pc.rollback {
return
}
pc.logger.Info("Closed peer connection")
var err error
pc.PeerConnection, err = pc.createPeerConnection()
if err != nil {
pc.logger.Panicf("Failed to set re-create peer connection: %s", err)
}
if err := pc.OnConnectionCreated(); err != nil {
pc.logger.Panicf("Failed to create connection: %w", err)
}
}
}
func (pc *PeerConnection) OnSignalingConnectedHandler() {
var err error
pc.logger.Info("Signaling connected")
// Create initial peer connection
if pc.PeerConnection == nil {
if pc.PeerConnection, err = pc.createPeerConnection(); err != nil {
pc.logger.Panicf("Failed to create peer connection: %s", err)
}
if err := pc.OnConnectionCreated(); err != nil {
pc.logger.Panicf("Failed to create connection: %s", err)
}
}
}
func (pc *PeerConnection) OnSignalingMessageHandler(msg *SignalingMessage) {
var err error
if msg.Control != nil {
if len(msg.Control.Connections) > 2 {
pc.logger.Panicf("There are already two peers connected to this session.")
}
// The peer with the smallest connection ID connected first
pc.first = true
for _, c := range msg.Control.Connections {
if c.ID < msg.Control.ConnectionID {
pc.first = false
}
}
pc.polite = pc.first
pc.logger.Infof("New role: polite=%v, first=%v", pc.polite, pc.first)
} else if msg.Description != nil {
readyForOffer := !pc.makingOffer && pc.SignalingState() == webrtc.SignalingStateStable
offerCollision := msg.Description.Type == webrtc.SDPTypeOffer && !readyForOffer
pc.ignoreOffer = !pc.polite && offerCollision
if pc.ignoreOffer {
return
}
if msg.Description.Type == webrtc.SDPTypeOffer && pc.PeerConnection.SignalingState() != webrtc.SignalingStateStable {
if pc.PeerConnection, err = pc.rollbackPeerConnection(); err != nil {
pc.logger.Panicf("Failed to rollback peer connection: %s", err)
}
if err := pc.OnConnectionCreated(); err != nil {
pc.logger.Panicf("Failed to create connection: %s", err)
}
}
if err := pc.PeerConnection.SetRemoteDescription(*msg.Description); err != nil {
pc.logger.Panicf("Failed to set remote description: %s", err)
}
if msg.Description.Type == webrtc.SDPTypeOffer {
answer, err := pc.PeerConnection.CreateAnswer(nil)
if err != nil {
pc.logger.Panicf("Failed to create answer: %s", err)
}
if err := pc.SetLocalDescription(answer); err != nil {
pc.logger.Panicf("Failed to rollback signaling state: %s", err)
}
pc.SendSignalingMessage(&SignalingMessage{
Description: pc.LocalDescription(),
})
}
} else if msg.Candidate != nil {
if err := pc.AddICECandidate(*msg.Candidate); err != nil && !pc.ignoreOffer {
pc.logger.Panicf("Failed to add new ICE candidate: %s", err)
}
}
}
func (pc *PeerConnection) Close() error {
if err := pc.SignalingClient.Close(); err != nil {
return fmt.Errorf("failed to close signaling client: %w", err)
}
if err := pc.PeerConnection.Close(); err != nil {
return fmt.Errorf("failed to close peer connection: %w", err)
}
return nil
}