Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 208 additions & 11 deletions clients/iota-go/iotaconn/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,36 +8,39 @@
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/google/uuid"
"github.com/gorilla/websocket"

"github.com/iotaledger/hive.go/logger"
)

type WebsocketClient struct {
idCounter uint32
url string
conn *websocket.Conn
writeQueue chan *jsonrpcMessage
readers sync.Map // id -> chan *jsonrpcMessage
log *logger.Logger
shutdownWaitGroup sync.WaitGroup
reconnectMx sync.RWMutex
}

func NewWebsocketClient(
ctx context.Context,
url string,
log *logger.Logger,
) (*WebsocketClient, error) {
dialer := websocket.Dialer{}
conn, _, err := dialer.DialContext(ctx, url, nil)
if err != nil {
return nil, fmt.Errorf("failed to connect to websocket server: %w", err)
}

c := &WebsocketClient{
conn: conn,
url: url,
writeQueue: make(chan *jsonrpcMessage),
log: log,
}

c.reconnect(ctx)

Check failure on line 42 in clients/iota-go/iotaconn/websocket.go

View workflow job for this annotation

GitHub Actions / Lint

Error return value of `c.reconnect` is not checked (errcheck)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor thing, not a required fix: Maybe check an error? If context is canceled, if would be nice to return error, especially when we already have error returned from the contructor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


c.shutdownWaitGroup.Add(1)
go c.loop(ctx)
return c, nil
Expand All @@ -47,6 +50,8 @@
c.shutdownWaitGroup.Wait()
}

// var counter = 0

func (c *WebsocketClient) loop(ctx context.Context) {
defer c.shutdownWaitGroup.Done()

Expand All @@ -56,15 +61,22 @@
}
receivedMsgs := make(chan readMsgResult)
go func() {
c.log.Infof("websocket loop started")
defer c.log.Infof("websocket loop finished")
defer close(receivedMsgs)
for {
m, p, err := c.conn.ReadMessage()
m, p, err := c.readMessage()
if err != nil {
c.log.Errorf("WebsocketClient read loop: %s", err)
return
} else {
receivedMsgs <- readMsgResult{messageType: m, p: p}
continue
}
var j *jsonrpcMessage
if err := json.Unmarshal(p, &j); err != nil {
c.log.Errorf("WebsocketClient: could not unmarshal response body: %s", err)
continue
}
c.log.Debugf("ws message was read: %v, %v", j.ID, j.Method)
receivedMsgs <- readMsgResult{messageType: m, p: p}
}
}()

Expand All @@ -79,7 +91,7 @@
c.log.Errorf("WebsocketClient: could not marshal json: %s", err)
continue
}
err = c.conn.WriteMessage(websocket.TextMessage, reqBody)
err = c.writeMessage(websocket.TextMessage, reqBody)
if err != nil {
c.log.Errorf("WebsocketClient: write error: %s", err)
return
Expand All @@ -88,6 +100,14 @@
if !ok {
return
}

// uncomment to simulate connection loss
// counter++
// if counter%30 == 0 {
// c.log.Infof("simulating connection loss")
// c.conn.Close()
// }

switch receivedMsg.messageType {
case websocket.TextMessage:
var m *jsonrpcMessage
Expand All @@ -99,6 +119,7 @@
if len(m.ID) > 0 {
// this is a response to a method call
id = string(m.ID)
c.log.Debugf("response to method call: %+v", m.ID)
} else if m.Method != "" {
// this is a subscription message
var s struct {
Expand All @@ -109,6 +130,7 @@
continue
}
id = fmt.Sprintf("%s:%d", m.Method, s.Subscription)
c.log.Debugf("subscription message: %v", id)
} else {
c.log.Errorf("WebsocketClient: cannot identify message: %s", receivedMsg.p)
continue
Expand All @@ -117,6 +139,7 @@
if ok {
readCh.(chan *jsonrpcMessage) <- m
} else {
// this can sometimes happen, but it's not an issue: the channel should be associated with the new id by now
c.log.Errorf("WebsocketClient: no reader for message: %s", receivedMsg.p)
continue
}
Expand All @@ -128,6 +151,37 @@
}
}

func (c *WebsocketClient) readMessage() (messageType int, p []byte, err error) {
if c.conn == nil {
return 0, nil, fmt.Errorf("connection is nil")
}

messageType, p, err = c.conn.ReadMessage()
if err != nil {
c.log.Warnf("read failed: %s", err)
if reconnErr := c.reconnect(context.Background()); reconnErr != nil {
return 0, nil, fmt.Errorf("read failed and reconnect failed: %w", err)
}
return c.readMessage()
}
return messageType, p, nil
}

func (c *WebsocketClient) writeMessage(messageType int, data []byte) error {
if c.conn == nil {
return fmt.Errorf("connection is nil")
}
err := c.conn.WriteMessage(messageType, data)
if err != nil {
c.log.Warnf("write failed: %s", err)
if reconnErr := c.reconnect(context.Background()); reconnErr != nil {
return fmt.Errorf("write failed and reconnect failed: %w", err)
}
return c.writeMessage(messageType, data)
}
return nil
}

func (c *WebsocketClient) writeMsg(method JsonRPCMethod, args ...interface{}) (string, error) {
msg, err := c.newMessage(method.String(), args...)
if err != nil {
Expand All @@ -140,6 +194,23 @@
return id, nil
}

type subscription struct {
method JsonRPCMethod
args []interface{}
id string
uuid uuid.UUID
}

var subscriptions = make([]*subscription, 0, 2)

type call struct {
method JsonRPCMethod
args []interface{}
id string
}

var pendingCalls sync.Map

func (c *WebsocketClient) CallContext(
ctx context.Context,
result interface{},
Expand All @@ -149,13 +220,22 @@
if result != nil && reflect.TypeOf(result).Kind() != reflect.Ptr {
return fmt.Errorf("call result parameter must be pointer or nil interface: %v", result)
}

id, err := c.writeMsg(method, args...)
if err != nil {
return err
}

pendingCalls.Store(id, &call{method: method, args: args, id: id})
defer func() {
pendingCalls.Delete(id)
}()

readCh, _ := c.readers.Load(id)
defer c.readers.Delete(id)
c.log.Debugf("waiting for response to %s", id)
respmsg := <-readCh.(chan *jsonrpcMessage)
c.log.Debugf("response to %s received", id)
if respmsg.Error != nil {
return respmsg.Error
}
Expand All @@ -180,6 +260,14 @@
readCh := make(chan *jsonrpcMessage)
c.readers.Store(id, readCh)

subscriptions = append(subscriptions, &subscription{
method: method,
args: args,
id: id,
uuid: uuid.New(),
})
c.log.Debugf("subscribing to %s", method)

go func() {
defer close(resultCh)
defer c.readers.Delete(id)
Expand All @@ -201,6 +289,7 @@
c.log.Errorf("could not unmarshal msg.Params: %s", err)
continue
}
c.log.Debugf("subscription result: %+v", params.Result)
resultCh <- params.Result
}
}
Expand Down Expand Up @@ -229,3 +318,111 @@
id := atomic.AddUint32(&c.idCounter, 1)
return strconv.FormatUint(uint64(id), 10)
}

func (c *WebsocketClient) reconnect(ctx context.Context) error {
c.log.Debugf("reconnecting")
if c.reconnectMx.TryLock() {
defer c.reconnectMx.Unlock()
} else {
// already reconnecting, try again later
time.Sleep(50 * time.Millisecond)
Copy link
Collaborator

@nnikolash nnikolash Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If one thread hangs on reconnect, another will create 1 stack frame every 50ms, because e.g. writeMessage uses recursion after reconnect returns. Can this lead to stack overflow eventually?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess in theory we could run out of memory but it will only happen if L1 is not accessible for a really long time, which means that wasp will not be working anyway. Besides, golang's stack is more flexible and can grow dynamically. So I'm not sure if it's a major issue

return nil
}

if c.conn != nil {
c.conn.Close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If read and write both will need to reconnect, would that mean that eventually they both will call this function?
I mean:

  1. read fail and tried to reconnect
  2. write fails and tries to reconnect but is required to wait for read to reconnect
  3. read reconnects -> resubscribes and recreated pending calls
  4. write "unfreezes" and is now also able to reconnect, and thus deletes again the connect (which might again trigger reconnect for read), and again resubscribes and recreates calls

I might be missing something

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this scenario is possible but is very unlikely, it could only happen if second flow (write in your case) calls reconnect RIGHT after the first reconnect finishes and the lock is released. In most cases second connection will just be waiting 50ms and then reconnect using new connection

}

const retryInterval = time.Second
attempt := 1

for {
dialer := websocket.Dialer{}
conn, _, err := dialer.DialContext(ctx, c.url, nil)
if err != nil {
c.log.Warnf("connection attempt %d failed: %v", attempt, err)
select {
case <-ctx.Done():
return fmt.Errorf("context cancelled while reconnecting: %w", ctx.Err())
case <-time.After(retryInterval):
attempt++
continue
}
}

c.conn = conn
c.log.Debugf("new connection set after %d attempts", attempt)

// recreating subscriptions and recreating pending calls. This should happen asynchronously because it needs the loop to be running
go c.resubscribe(ctx)
go c.recreatePendingCalls()

return nil
}
}

// recreatePendingCalls recreates pending calls. Errors in this function will cause particular calls to not complete, so no need to fail other calls
func (c *WebsocketClient) recreatePendingCalls() {
pendingCalls.Range(func(key, value interface{}) bool {
call := value.(*call)
oldId := key.(string)

msg, err := c.newMessage(call.method.String(), call.args...)
if err != nil {
c.log.Errorf("failed to recreate pending call %s: %s", oldId, err)
return true
}

newId := string(msg.ID)

c.log.Debugf("recreate writing message: oldId: %s, newId: %s, %+v", oldId, newId, msg)

ch, ok := c.readers.Load(oldId)
if !ok {
c.log.Errorf("failed to recreate pending call: reader for old id %s not found", oldId)
return true
}
readCh := ch.(chan *jsonrpcMessage)
c.readers.Store(newId, readCh)
c.writeQueue <- msg

c.readers.Delete(oldId)

return true
})
}

// resubscribe to subscriptions. Errors in this function probably mean that subscription configurations themself contain errors, so ignoring
func (c *WebsocketClient) resubscribe(ctx context.Context) {
c.log.Debugf("resubscribing to %d subscriptions", len(subscriptions))
defer c.log.Debugf("resubscribed")

for _, sub := range subscriptions {
c.log.Debugf("resubscribing to %s, %+v", sub.method, sub.args)
defer c.log.Debugf("resubscribed to %s", sub.method)

Check failure on line 402 in clients/iota-go/iotaconn/websocket.go

View workflow job for this annotation

GitHub Actions / Lint

deferInLoop: Possible resource leak, 'defer' is called in the 'for' loop (gocritic)

method := sub.method
args := sub.args
oldId := sub.id

var subID uint64
err := c.CallContext(ctx, &subID, method, args...)
if err != nil {
c.log.Errorf("failed to resubscribe to %s: %s", method, err)
continue
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should return here with error? Why would we support subscribing for part of all subscriptions? Wouldn't hta be a faulty state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure. I though it makes sense to resubscribe to what we can anyway. Worst case scenario is that it just does not work

}
newId := fmt.Sprintf("%s:%d", method, subID)

// store reader channel with new id
ch, ok := c.readers.Load(oldId)
defer c.readers.Delete(oldId)

Check failure on line 418 in clients/iota-go/iotaconn/websocket.go

View workflow job for this annotation

GitHub Actions / Lint

deferInLoop: Possible resource leak, 'defer' is called in the 'for' loop (gocritic)
if !ok {
c.log.Errorf("reader for old id %s not found", oldId)
continue
}
c.readers.Store(newId, ch)

// need to update subscription id so that next resubscribe works
sub.id = newId
}
}
Loading