Skip to content

Conversation

@sawka
Copy link
Member

@sawka sawka commented Jan 27, 2026

Lots of updates across all parts of the system to get this working. Big changes to routing, streaming, connection management, etc.

  • Persistent sessions behind a metadata flag for now
  • New backlog queue in the router to prevent hanging
  • Fix connection Close() issues that caused hangs when network was down
  • Fix issue with random routeids (need to be generated fresh each time the JWT is used and not fixed) so you can run multiple-wsh commands at once
  • Fix issue with domain sockets changing names across wave restarts (added a symlink mechanism to resolve new names)
  • ClientId caching in main server
  • Quick reorder queue for input to prevent out of order delivery across multiple hops
  • Fix out-of-order event delivery in router (remove unnecessary go routine creation)
  • Environment testing and fix environment variables for remote jobs (get from connserver, add to remote job starts)
  • Add new ConnServerInit() remote method to call before marking connection up
  • TODO -- remote file transfer needs to be fixed to not create OOM issues when transferring large files or directories

sawka added 20 commits January 22, 2026 14:46
…ge how shelljob controller handles start / force-flag
…e lock while disconnecting. also fix disconnection to close the conn first to prevent hanging
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 27, 2026

Code Review Summary

Walkthrough

This pull request introduces a comprehensive refactoring of client ID management, block controller lifecycle, and shell job execution architecture. Key changes include: replacing database lookups with cached client ID retrieval via wstore.GetClientId(); renaming StopBlockController to DestroyBlockController across the codebase; introducing ShellJobController for remote job orchestration; adding environment variable propagation mechanisms; implementing a QuickReorderQueue for input sequencing; and enhancing RPC infrastructure with backlog-aware message delivery and routing improvements. The frontend is updated to reflect new RPC commands (ConnServerInitCommand, ControllerDestroyCommand) while removing deprecated ones.

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 13.39% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title 'Persistent Terminal Sessions (+ improvements and bug fixes)' is partially related to the changeset. It highlights persistent sessions but uses a vague catch-all phrase '+ improvements and bug fixes' rather than capturing the full scope of changes.
Description check ✅ Passed The description is well-related to the changeset, detailing multiple major changes (backlog queue, routing fixes, environment handling, ConnServerInit, etc.) that align with the file modifications and objectives provided.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

}

func CopyAndAddToEnvMap(envMap map[string]string, key string, val string) map[string]string {
newMap := make(map[string]string, len(envMap)+1)

Check failure

Code scanning / CodeQL

Size computation for allocation may overflow High

This operation, which is used in an
allocation
, involves a
potentially large value
and might overflow.

Copilot Autofix

AI about 7 hours ago

Conceptually, the fix is to avoid doing unchecked arithmetic on len(envMap) for the map allocation. Since the capacity argument to make(map[...], n) is only a hint and not required for correctness, the simplest and safest approach is to either (a) allocate without a capacity or (b) guard the addition with a math.MaxInt check. Removing the capacity hint slightly changes performance but not functionality; given this helper is only copying environment maps, the performance impact is negligible.

The best minimal‑change fix is to replace:

newMap := make(map[string]string, len(envMap)+1)

with a version that avoids the overflow risk while keeping roughly the same behavior. A simple pattern is:

newCap := len(envMap) + 1
if newCap < 0 {
    newCap = 0
}
newMap := make(map[string]string, newCap)

However, this still technically performs the potentially overflowing addition. The cleanest, warning‑free fix is simply to drop the capacity hint:

newMap := make(map[string]string)

Functionality is unchanged: the map still ends up containing all entries from envMap plus the new key/value. No additional imports are needed, and this change is entirely local to CopyAndAddToEnvMap in pkg/util/envutil/envutil.go around line 106–112.

Suggested changeset 1
pkg/util/envutil/envutil.go

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/pkg/util/envutil/envutil.go b/pkg/util/envutil/envutil.go
--- a/pkg/util/envutil/envutil.go
+++ b/pkg/util/envutil/envutil.go
@@ -104,7 +104,7 @@
 }
 
 func CopyAndAddToEnvMap(envMap map[string]string, key string, val string) map[string]string {
-	newMap := make(map[string]string, len(envMap)+1)
+	newMap := make(map[string]string)
 	for k, v := range envMap {
 		newMap[k] = v
 	}
EOF
@@ -104,7 +104,7 @@
}

func CopyAndAddToEnvMap(envMap map[string]string, key string, val string) map[string]string {
newMap := make(map[string]string, len(envMap)+1)
newMap := make(map[string]string)
for k, v := range envMap {
newMap[k] = v
}
Copilot is powered by AI and may make mistakes. Always verify output.
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🤖 Fix all issues with AI agents
In `@pkg/blockcontroller/blockcontroller.go`:
- Around line 145-166: The selection logic for shouldUseShellJobController
incorrectly treats WSL connections as remote; update the condition that sets
shouldUseShellJobController so it excludes WSL-style connections (e.g., connName
starting with "wsl://") before marking a controller as ShellJobController. In
practice, modify the computation of shouldUseShellJobController (the variable
used in the controller-morphing switch and the second occurrence later) to
require isRemote && !isWSL (detect WSL by checking connName prefix "wsl://" or
via an existing conncontroller helper if available), so
ShellJobController.startNewJob is not chosen for WSL connections.

In `@pkg/jobcontroller/jobcontroller.go`:
- Around line 303-305: The current log call prints the raw environment map
(params.Env) which can leak secrets; modify the logging before calling
wshclient.RemoteStartJobCommand so it no longer logs params.Env directly—use a
redacted summary such as the number of env entries (len(params.Env)) or a
sanitized list of keys/allowed keys instead, keeping the existing log for jobId,
params.ConnName, params.Cmd, and params.Args; update the log statement that
references jobId and params.Env to instead log e.g. "env_count=%d" or
"env_keys=%v (redacted)" so only non-sensitive metadata is logged prior to
invoking RemoteStartJobCommand.

In `@pkg/remote/conncontroller/conncontroller.go`:
- Around line 526-530: The call to wshclient.ConnServerInitCommand currently
discards its returned error, so initialization failures are ignored; update the
code that invokes wshclient.ConnServerInitCommand (calling with
wshclient.GetBareRpcClient(), wshrpc.CommandConnServerInitData{ClientId:
wstore.GetClientId()}, &wshrpc.RpcOpts{Route: connRoute}) to capture the
returned error, and handle it by either returning the error from the surrounding
function or logging it with context (e.g., using the existing logger) and
aborting the connection flow; ensure the fix propagates the failure instead of
silently continuing for persistent sessions.

In `@pkg/shellexec/shellexec.go`:
- Line 14: The code is importing filepath and using filepath.Join to build the
remote ZDOTDIR (and other remote paths at the region referenced around lines
533-538) which produces Windows backslashes; replace usage of filepath.Join with
path.Join (import "path" instead of "path/filepath") wherever you construct
paths that will be interpreted on the remote shell (e.g., the remoteZDOTDIR
construction and any join calls inside the functions that assemble remote shell
commands), so remote paths always use POSIX forward slashes.

In `@pkg/utilds/quickreorderqueue.go`:
- Around line 204-226: The timeout flush currently only tracks
highestTimedOutSeq and compares seqNum, ignoring session boundaries; change it
to track the highest timed-out item as a (sessionId, seqNum) pair (or simply the
item) and use the existing comparator cmpSessionSeq_withlock to decide which
buffered items to flush. Specifically, find the highestTimedOutItem from
q.buffer where now.Sub(item.timestamp) >= q.timeout, then iterate q.buffer and
for each item call cmpSessionSeq_withlock(item, highestTimedOutItem) (or
equivalent) to determine <= 0 to enqueue via q.outCh, and update q.nextSeqNum
using item.seqNum only when the session comparison indicates it belongs to the
advanced session/seq. Ensure you reference q.buffer, q.timeout,
cmpSessionSeq_withlock, q.outCh, and q.nextSeqNum when making the change.

In `@pkg/waveapp/waveapp.go`:
- Around line 184-188: AuthenticateCommand may return an empty
authRtnData.RouteId so update the code to validate/fallback and keep RpcContext
consistent: after calling AuthenticateCommand, check if authRtnData.RouteId is
empty/nil and if so set the chosenRouteId to the existing rpcCtx.RouteId (or
other previous value), then assign that chosenRouteId to both client.RouteId and
client.RpcContext.RouteId (ensuring client.RpcContext is non-nil) so the client
and its RpcContext remain in sync.

In `@pkg/web/ws.go`:
- Line 166: processMessage calls into processWSCommand which currently does a
synchronous send to rpcInputCh (FromRemoteCh) inside the ReadLoop, risking
blocking websocket reads when the 32-slot buffer fills; change the send in
processWSCommand to avoid blocking by using a select: try the non-blocking send
case to rpcInputCh, include a timeout case (time.After) to give a short window
for the send, and a default/failure branch that logs a warning and drops or
returns an error to the caller so ReadLoop is not blocked; update any callers
(e.g., processMessage) to handle the possible error/false return, and consider
increasing the buffer size only after measuring throughput.
🧹 Nitpick comments (8)
pkg/remote/conncontroller/conncontroller.go (1)

300-362: Consider adding timeouts to SSH session operations.

The session.Run("env -0") calls in both getEnvironmentNoPty and getEnvironmentWithPty have no timeout. On an unresponsive remote host, these could block indefinitely. Since these methods are for testing, this may be acceptable, but adding a context-based timeout would improve reliability.

💡 Suggested approach using context with timeout
func (conn *SSHConn) getEnvironmentNoPty(ctx context.Context, client *ssh.Client) (map[string]string, error) {
	session, err := client.NewSession()
	if err != nil {
		return nil, fmt.Errorf("unable to create ssh session: %w", err)
	}
	defer session.Close()

	outputBuf := &strings.Builder{}
	session.Stdout = outputBuf
	session.Stderr = outputBuf

	// Run with context timeout
	done := make(chan error, 1)
	go func() {
		done <- session.Run("env -0")
	}()
	
	select {
	case err := <-done:
		if err != nil {
			return nil, fmt.Errorf("error running env command: %w", err)
		}
	case <-ctx.Done():
		session.Close()
		return nil, ctx.Err()
	}

	return envutil.EnvToMap(outputBuf.String()), nil
}
pkg/wcore/wcore.go (1)

148-161: Consider guarding against uninitialized client ID.

If GetClientId() returns an empty string, the telemetry update would be sent with an empty clientId. While this is unlikely in practice (since GoSendNoTelemetryUpdate is called after initialization), adding a guard would make the code more robust:

clientId := wstore.GetClientId()
if clientId == "" {
    log.Printf("[warning] skipping no-telemetry update: client not initialized\n")
    return
}
pkg/utilds/quickreorderqueue_test.go (1)

85-245: Consider relaxing short sleeps/timeouts to reduce flake risk.

Several tests rely on 50–200ms windows; under CI load these can intermittently fail. Longer windows or a controllable clock would be more robust.

pkg/wshrpc/wshremote/wshremote.go (1)

100-124: Consider propagating filesystem errors instead of swallowing them.

The method returns nil after logging warnings when MkdirAll or Symlink fails (lines 113-121). While this resilience approach allows the connection to proceed, it may silently hide permission issues or filesystem problems that could cause downstream failures.

Consider returning the error or at least distinguishing between recoverable and non-recoverable failures. Alternatively, if this is intentional for resilience, a brief code comment explaining the design choice would help future maintainers.

pkg/jobmanager/jobmanager.go (2)

211-296: Consider reducing lock scope for better concurrency.

The method holds jm.lock for its entire duration (lines 212-213), which includes potentially slow operations like MakeJobCmd, stream attachment, and process info retrieval via gopsutil. This could block other operations on the JobManager during job startup.

Consider restructuring to release the lock after checking/setting jm.Cmd, perform the slow operations, then re-acquire to finalize state if needed. However, this may require careful handling of concurrent StartJob calls.


247-264: Minor: GetCmd() is called twice.

jobCmd.GetCmd() is called at line 247 (for the PTY) and again at line 260 (for the cmd). Consider storing both results from a single call:

♻️ Suggested simplification
-	_, cmdPty := jobCmd.GetCmd()
-	if cmdPty != nil {
+	cmd, cmdPty := jobCmd.GetCmd()
+	if cmdPty != nil {
 		log.Printf("StartJob: attaching pty reader to stream manager")
 		err = jm.StreamManager.AttachReader(cmdPty)
 		if err != nil {
 			log.Printf("StartJob: failed to attach reader: %v", err)
 			return nil, fmt.Errorf("failed to attach reader to stream manager: %w", err)
 		}
 		log.Printf("StartJob: pty reader attached successfully")
 	} else {
 		log.Printf("StartJob: no pty to attach")
 	}

-	cmd, _ := jobCmd.GetCmd()
 	if cmd == nil || cmd.Process == nil {
pkg/wshutil/wshrouter.go (1)

489-502: Consider adding a shutdown mechanism for the backlog processor.

The processBacklog goroutine runs an infinite loop with no way to stop it gracefully. While the router likely lives for the application lifetime, adding a done channel or checking a closed flag would make the code more robust and testable.

pkg/blockcontroller/blockcontroller.go (1)

16-21: Prevent user env from overriding internal WAVETERM_ values.*
resolveEnvMap is merged after the internal WAVETERM_* fields, so user-provided env can override internal identity fields. Consider merging env first, then stamping WAVETERM_* keys. Also, if not already done, please confirm the github.com/google/uuid version is pinned/vetted.

♻️ Proposed refactor
 token := &shellutil.TokenSwapEntry{
 	Token: uuid.New().String(),
 	Env:   make(map[string]string),
 	Exp:   time.Now().Add(5 * time.Minute),
 }
-token.Env["TERM_PROGRAM"] = "waveterm"
-token.Env["WAVETERM_BLOCKID"] = blockId
-token.Env["WAVETERM_VERSION"] = wavebase.WaveVersion
-token.Env["WAVETERM"] = "1"
+envMap, err := resolveEnvMap(blockId, blockMeta, remoteName)
+if err != nil {
+	log.Printf("error resolving env map: %v\n", err)
+}
+for k, v := range envMap {
+	token.Env[k] = v
+}
+token.Env["TERM_PROGRAM"] = "waveterm"
+token.Env["WAVETERM_BLOCKID"] = blockId
+token.Env["WAVETERM_VERSION"] = wavebase.WaveVersion
+token.Env["WAVETERM"] = "1"
 ...
-token.Env["WAVETERM_CLIENTID"] = wstore.GetClientId()
-token.Env["WAVETERM_CONN"] = remoteName
-envMap, err := resolveEnvMap(blockId, blockMeta, remoteName)
-if err != nil {
-	log.Printf("error resolving env map: %v\n", err)
-}
-for k, v := range envMap {
-	token.Env[k] = v
-}
+token.Env["WAVETERM_CLIENTID"] = wstore.GetClientId()
+token.Env["WAVETERM_CONN"] = remoteName

Also applies to: 409-444

Comment on lines +145 to 166
// Determine if we should use ShellJobController vs ShellController
isPersistent := blockData.Meta.GetBool(waveobj.MetaKey_CmdPersistent, false)
connName := blockData.Meta.GetString(waveobj.MetaKey_Connection, "")
isRemote := !conncontroller.IsLocalConnName(connName)
shouldUseShellJobController := isPersistent && isRemote && (controllerName == BlockController_Shell || controllerName == BlockController_Cmd)

// Check if we need to morph controller type
if existing != nil {
existingStatus := existing.GetRuntimeStatus()
needsReplace := false

// Determine if existing controller type matches what we need
switch existing.(type) {
case *ShellController:
if controllerName != BlockController_Shell && controllerName != BlockController_Cmd {
needsReplace = true
} else if shouldUseShellJobController {
needsReplace = true
}
case *ShellJobController:
if !shouldUseShellJobController {
needsReplace = true
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Exclude WSL from ShellJobController selection.
shouldUseShellJobController treats wsl:// as remote, but ShellJobController.startNewJob parses SSH opts and will fail for WSL connections. Persistent WSL shells would fail to start.

🐛 Proposed fix
-	isRemote := !conncontroller.IsLocalConnName(connName)
-	shouldUseShellJobController := isPersistent && isRemote && (controllerName == BlockController_Shell || controllerName == BlockController_Cmd)
+	isRemote := !conncontroller.IsLocalConnName(connName)
+	isWsl := strings.HasPrefix(connName, "wsl://")
+	shouldUseShellJobController := isPersistent && isRemote && !isWsl &&
+		(controllerName == BlockController_Shell || controllerName == BlockController_Cmd)

Also applies to: 220-224

🤖 Prompt for AI Agents
In `@pkg/blockcontroller/blockcontroller.go` around lines 145 - 166, The selection
logic for shouldUseShellJobController incorrectly treats WSL connections as
remote; update the condition that sets shouldUseShellJobController so it
excludes WSL-style connections (e.g., connName starting with "wsl://") before
marking a controller as ShellJobController. In practice, modify the computation
of shouldUseShellJobController (the variable used in the controller-morphing
switch and the second occurrence later) to require isRemote && !isWSL (detect
WSL by checking connName prefix "wsl://" or via an existing conncontroller
helper if available), so ShellJobController.startNewJob is not chosen for WSL
connections.

Comment on lines +303 to 305
log.Printf("[job:%s] sending RemoteStartJobCommand to connection %s, cmd=%q, args=%v", jobId, params.ConnName, params.Cmd, params.Args)
log.Printf("[job:%s] env=%v", jobId, params.Env)
rtnData, err := wshclient.RemoteStartJobCommand(bareRpc, startJobData, rpcOpts)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid logging raw env maps (may leak secrets/PII).

Line 303-305 logs the full environment, which often includes credentials and sensitive data. Please redact or log only counts/keys.

🔧 Suggested fix (log only count)
-log.Printf("[job:%s] env=%v", jobId, params.Env)
+log.Printf("[job:%s] env vars=%d", jobId, len(params.Env))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
log.Printf("[job:%s] sending RemoteStartJobCommand to connection %s, cmd=%q, args=%v", jobId, params.ConnName, params.Cmd, params.Args)
log.Printf("[job:%s] env=%v", jobId, params.Env)
rtnData, err := wshclient.RemoteStartJobCommand(bareRpc, startJobData, rpcOpts)
log.Printf("[job:%s] sending RemoteStartJobCommand to connection %s, cmd=%q, args=%v", jobId, params.ConnName, params.Cmd, params.Args)
log.Printf("[job:%s] env vars=%d", jobId, len(params.Env))
rtnData, err := wshclient.RemoteStartJobCommand(bareRpc, startJobData, rpcOpts)
🤖 Prompt for AI Agents
In `@pkg/jobcontroller/jobcontroller.go` around lines 303 - 305, The current log
call prints the raw environment map (params.Env) which can leak secrets; modify
the logging before calling wshclient.RemoteStartJobCommand so it no longer logs
params.Env directly—use a redacted summary such as the number of env entries
(len(params.Env)) or a sanitized list of keys/allowed keys instead, keeping the
existing log for jobId, params.ConnName, params.Cmd, and params.Args; update the
log statement that references jobId and params.Env to instead log e.g.
"env_count=%d" or "env_keys=%v (redacted)" so only non-sensitive metadata is
logged prior to invoking RemoteStartJobCommand.

Comment on lines +526 to +530
wshclient.ConnServerInitCommand(
wshclient.GetBareRpcClient(),
wshrpc.CommandConnServerInitData{ClientId: wstore.GetClientId()},
&wshrpc.RpcOpts{Route: connRoute},
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Error from ConnServerInitCommand is silently ignored.

The return value of ConnServerInitCommand is discarded. If this initialization fails, the connection may appear successful but behave incorrectly for persistent sessions. Consider logging or returning the error.

🐛 Proposed fix
-	wshclient.ConnServerInitCommand(
+	err = wshclient.ConnServerInitCommand(
 		wshclient.GetBareRpcClient(),
 		wshrpc.CommandConnServerInitData{ClientId: wstore.GetClientId()},
 		&wshrpc.RpcOpts{Route: connRoute},
 	)
+	if err != nil {
+		conn.Infof(ctx, "warning: connserver init failed: %v\n", err)
+		// Continue anyway - connection may still work for non-persistent operations
+	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
wshclient.ConnServerInitCommand(
wshclient.GetBareRpcClient(),
wshrpc.CommandConnServerInitData{ClientId: wstore.GetClientId()},
&wshrpc.RpcOpts{Route: connRoute},
)
err = wshclient.ConnServerInitCommand(
wshclient.GetBareRpcClient(),
wshrpc.CommandConnServerInitData{ClientId: wstore.GetClientId()},
&wshrpc.RpcOpts{Route: connRoute},
)
if err != nil {
conn.Infof(ctx, "warning: connserver init failed: %v\n", err)
// Continue anyway - connection may still work for non-persistent operations
}
🤖 Prompt for AI Agents
In `@pkg/remote/conncontroller/conncontroller.go` around lines 526 - 530, The call
to wshclient.ConnServerInitCommand currently discards its returned error, so
initialization failures are ignored; update the code that invokes
wshclient.ConnServerInitCommand (calling with wshclient.GetBareRpcClient(),
wshrpc.CommandConnServerInitData{ClientId: wstore.GetClientId()},
&wshrpc.RpcOpts{Route: connRoute}) to capture the returned error, and handle it
by either returning the error from the surrounding function or logging it with
context (e.g., using the existing logger) and aborting the connection flow;
ensure the fix propagates the failure instead of silently continuing for
persistent sessions.

"log"
"os"
"os/exec"
"path/filepath"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Use POSIX path joining for remote ZDOTDIR.

filepath.Join will emit backslashes on Windows hosts, which can break the remote zsh path when the remote is Unix-like. Prefer path.Join (or manual /) for remote paths.

💡 Proposed fix
-import (
+import (
 	"bytes"
 	"context"
 	"fmt"
 	"io"
 	"log"
 	"os"
 	"os/exec"
-	"path/filepath"
+	"path"
 	"runtime"
 	"strings"
 	"sync"
 	"syscall"
 	"time"
@@
-		zshDir := filepath.Join(remoteInfo.HomeDir, ".waveterm", shellutil.ZshIntegrationDir)
+		zshDir := path.Join(remoteInfo.HomeDir, ".waveterm", shellutil.ZshIntegrationDir)

Also applies to: 533-538

🤖 Prompt for AI Agents
In `@pkg/shellexec/shellexec.go` at line 14, The code is importing filepath and
using filepath.Join to build the remote ZDOTDIR (and other remote paths at the
region referenced around lines 533-538) which produces Windows backslashes;
replace usage of filepath.Join with path.Join (import "path" instead of
"path/filepath") wherever you construct paths that will be interpreted on the
remote shell (e.g., the remoteZDOTDIR construction and any join calls inside the
functions that assemble remote shell commands), so remote paths always use POSIX
forward slashes.

Comment on lines +204 to +226
highestTimedOutSeq := -1
for _, item := range q.buffer {
if now.Sub(item.timestamp) >= q.timeout {
highestTimedOutSeq = item.seqNum
}
}

if highestTimedOutSeq == -1 {
return
}

enqueued := 0
for i, item := range q.buffer {
if item.seqNum <= highestTimedOutSeq {
q.outCh <- item.data
enqueued = i + 1
if item.seqNum >= q.nextSeqNum {
q.nextSeqNum = item.seqNum + 1
}
} else {
break
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Potential bug: onTimeout flush logic ignores session boundaries.

The timeout flush at lines 215-226 compares only seqNum without considering sessionId. Since the buffer is sorted by (sessionTimestamp, seqNum), items from an earlier session could have higher sequence numbers than items from a later session.

For example:

  • Buffer contains: [{sessionA, seq=10}, {sessionB, seq=2}] (sorted by session timestamp)
  • If sessionB's item times out, highestTimedOutSeq = 2
  • Line 217 checks item.seqNum <= 2, which would NOT emit sessionA's seq=10 even though it's earlier in the sorted order

The flush should either:

  1. Use the same cmpSessionSeq_withlock comparison, or
  2. Track highest timed-out (session, seq) pair instead of just seq
🐛 Suggested fix
-	highestTimedOutSeq := -1
+	var highestTimedOut *queuedItem[T]
 	for _, item := range q.buffer {
 		if now.Sub(item.timestamp) >= q.timeout {
-			highestTimedOutSeq = item.seqNum
+			itemCopy := item
+			highestTimedOut = &itemCopy
 		}
 	}

-	if highestTimedOutSeq == -1 {
+	if highestTimedOut == nil {
 		return
 	}

 	enqueued := 0
 	for i, item := range q.buffer {
-		if item.seqNum <= highestTimedOutSeq {
+		if q.cmpSessionSeq_withlock(item.sessionId, item.seqNum, highestTimedOut.sessionId, highestTimedOut.seqNum) <= 0 {
 			q.outCh <- item.data
 			enqueued = i + 1
 			if item.seqNum >= q.nextSeqNum {
 				q.nextSeqNum = item.seqNum + 1
 			}
🤖 Prompt for AI Agents
In `@pkg/utilds/quickreorderqueue.go` around lines 204 - 226, The timeout flush
currently only tracks highestTimedOutSeq and compares seqNum, ignoring session
boundaries; change it to track the highest timed-out item as a (sessionId,
seqNum) pair (or simply the item) and use the existing comparator
cmpSessionSeq_withlock to decide which buffered items to flush. Specifically,
find the highestTimedOutItem from q.buffer where now.Sub(item.timestamp) >=
q.timeout, then iterate q.buffer and for each item call
cmpSessionSeq_withlock(item, highestTimedOutItem) (or equivalent) to determine
<= 0 to enqueue via q.outCh, and update q.nextSeqNum using item.seqNum only when
the session comparison indicates it belongs to the advanced session/seq. Ensure
you reference q.buffer, q.timeout, cmpSessionSeq_withlock, q.outCh, and
q.nextSeqNum when making the change.

Comment on lines +184 to +188
authRtnData, err := wshclient.AuthenticateCommand(client.RpcClient, jwtToken, &wshrpc.RpcOpts{Route: wshutil.ControlRoute})
if err != nil {
return fmt.Errorf("error authenticating rpc connection: %v", err)
}
client.RouteId = rpcCtx.RouteId
client.RouteId = authRtnData.RouteId
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Guard against empty RouteId and keep RpcContext consistent.
If AuthenticateCommand returns a nil/empty RouteId, client.RouteId becomes empty (previously it used rpcCtx.RouteId), and client.RpcContext.RouteId remains stale. Recommend validating/fallbacking and updating both.

🛠️ Proposed fix
 authRtnData, err := wshclient.AuthenticateCommand(client.RpcClient, jwtToken, &wshrpc.RpcOpts{Route: wshutil.ControlRoute})
 if err != nil {
 	return fmt.Errorf("error authenticating rpc connection: %v", err)
 }
- client.RouteId = authRtnData.RouteId
+ routeId := rpcCtx.RouteId
+ if authRtnData != nil && authRtnData.RouteId != "" {
+ 	routeId = authRtnData.RouteId
+ }
+ if routeId == "" {
+ 	return fmt.Errorf("authenticate did not return route id")
+ }
+ client.RouteId = routeId
+ client.RpcContext.RouteId = routeId
🤖 Prompt for AI Agents
In `@pkg/waveapp/waveapp.go` around lines 184 - 188, AuthenticateCommand may
return an empty authRtnData.RouteId so update the code to validate/fallback and
keep RpcContext consistent: after calling AuthenticateCommand, check if
authRtnData.RouteId is empty/nil and if so set the chosenRouteId to the existing
rpcCtx.RouteId (or other previous value), then assign that chosenRouteId to both
client.RouteId and client.RpcContext.RouteId (ensuring client.RpcContext is
non-nil) so the client and its RpcContext remain in sync.

continue
}
go processMessage(jmsg, outputCh, rpcInputCh)
processMessage(jmsg, outputCh, rpcInputCh)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find WshRpcProxy definition and channel definitions
echo "=== Searching for WshRpcProxy and channel definitions ==="
rg -n "type WshRpcProxy" pkg -A 10

echo -e "\n=== Searching for FromRemoteCh channel creation (make) ==="
rg -n "FromRemoteCh\s*:=" pkg -B 2 -A 2

echo -e "\n=== Searching for FromRemoteCh usage patterns ==="
rg -n "FromRemoteCh" pkg -C 3

echo -e "\n=== Searching for MakeRpcProxy function ==="
rg -n "func MakeRpcProxy|func.*NewRpcProxy" pkg -A 15

Repository: wavetermdev/waveterm

Length of output: 5382


🏁 Script executed:

#!/bin/bash
# Find DefaultOutputChSize and DefaultInputChSize values
echo "=== Searching for DefaultOutputChSize and DefaultInputChSize ==="
rg -n "DefaultOutputChSize|DefaultInputChSize" pkg -B 2

echo -e "\n=== Searching for ReadLoop function signature in ws.go ==="
rg -n "func ReadLoop" pkg/web/ws.go -A 20

echo -e "\n=== Searching for processMessage calls in ReadLoop ==="
rg -n "processMessage" pkg/web/ws.go -B 3 -A 3

Repository: wavetermdev/waveterm

Length of output: 3863


🏁 Script executed:

#!/bin/bash
# Show full processMessage implementation
echo "=== Full processMessage implementation ==="
rg -n "func processMessage" pkg/web/ws.go -A 50

echo -e "\n=== Searching for rpcInputCh sends/writes ==="
rg -n "rpcInputCh\s*<-" pkg/web/ws.go

Repository: wavetermdev/waveterm

Length of output: 1955


🏁 Script executed:

#!/bin/bash
# Show processWSCommand implementation around line 125
echo "=== processWSCommand implementation ==="
rg -n "func processWSCommand" pkg/web/ws.go -A 30

echo -e "\n=== Context around rpcInputCh send at line 125 ==="
sed -n '100,135p' pkg/web/ws.go

Repository: wavetermdev/waveterm

Length of output: 2023


Synchronous rpcInputCh send in ReadLoop can block websocket reads.

The processWSCommand function sends to rpcInputCh (an alias for FromRemoteCh) synchronously without a timeout. While the channel is buffered with 32 slots, if messages arrive faster than the RPC consumer processes them, the buffer will fill and the send will block the ReadLoop, preventing all websocket message processing until the consumer catches up. This degrades websocket responsiveness under load.

Consider either:

  • Using a non-blocking send with a select/default clause and error handling for full buffer
  • Adding a timeout to the send operation
  • Measuring consumer throughput to confirm buffer size is adequate, or increasing it if needed
🤖 Prompt for AI Agents
In `@pkg/web/ws.go` at line 166, processMessage calls into processWSCommand which
currently does a synchronous send to rpcInputCh (FromRemoteCh) inside the
ReadLoop, risking blocking websocket reads when the 32-slot buffer fills; change
the send in processWSCommand to avoid blocking by using a select: try the
non-blocking send case to rpcInputCh, include a timeout case (time.After) to
give a short window for the send, and a default/failure branch that logs a
warning and drops or returns an error to the caller so ReadLoop is not blocked;
update any callers (e.g., processMessage) to handle the possible error/false
return, and consider increasing the buffer size only after measuring throughput.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants