Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
42ccaac
capture the initial environment in wshremote.ServerImpl
sawka Jan 22, 2026
1ec2d39
processmessage no longer needs to be async. this is much cleaner, an…
sawka Jan 22, 2026
a687604
use env definitively, dont combine with os.Environ()
sawka Jan 22, 2026
970a10a
combine with initialenv in connserver
sawka Jan 22, 2026
89bff89
remote jobs are working!
sawka Jan 23, 2026
b872db7
working on a shelljob controller
sawka Jan 23, 2026
ee67e2e
simplify input
sawka Jan 23, 2026
1cc0fd2
checkpoint, fix status versioning, destroy controllers instead of reu…
sawka Jan 23, 2026
00a70ce
working on not reusing block controllers
sawka Jan 23, 2026
2fbc1a2
remove unused types/uuid, rename jobdebug exit to terminate, and chan…
sawka Jan 24, 2026
a591e46
checkpoint on implementing persistent remote shell sessions
sawka Jan 24, 2026
43cc19b
added a backlog queue to wshrouter to prevent hanging when remote con…
sawka Jan 24, 2026
75fd426
fix swaptoken, add waveterm_jobid, wsh now working in persistent sess…
sawka Jan 26, 2026
a339d40
quick reorder queue for input events in the job manager...
sawka Jan 27, 2026
2af8a1f
cache term size changes in jobcontroller, resend on prepare connect, …
sawka Jan 27, 2026
ad3b047
fix initial resize for shelljobs
sawka Jan 27, 2026
ae24db5
resolve symlinks for sockname...
sawka Jan 27, 2026
ad522e0
updates for new cached clientid
sawka Jan 27, 2026
89d8506
lots of fixes. job lifecycle hooked up to conns, multiple wsh comman…
sawka Jan 27, 2026
a3c5e0e
fix big blocking disconnect bug with connections. now we dont hold t…
sawka Jan 27, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 10 additions & 16 deletions cmd/server/main-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/wavetermdev/waveterm/pkg/service"
"github.com/wavetermdev/waveterm/pkg/telemetry"
"github.com/wavetermdev/waveterm/pkg/telemetry/telemetrydata"
"github.com/wavetermdev/waveterm/pkg/util/envutil"
"github.com/wavetermdev/waveterm/pkg/util/shellutil"
"github.com/wavetermdev/waveterm/pkg/util/sigutil"
"github.com/wavetermdev/waveterm/pkg/util/utilfn"
Expand Down Expand Up @@ -79,7 +80,7 @@ func doShutdown(reason string) {
log.Printf("shutting down: %s\n", reason)
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn()
go blockcontroller.StopAllBlockControllers()
go blockcontroller.StopAllBlockControllersForShutdown()
shutdownActivityUpdate()
sendTelemetryWrapper()
// TODO deal with flush in progress
Expand Down Expand Up @@ -162,15 +163,9 @@ func sendDiagnosticPing() bool {
if err != nil || !isOnline {
return false
}
clientData, err := wstore.DBGetSingleton[*waveobj.Client](ctx)
if err != nil {
return false
}
if clientData == nil {
return false
}
clientId := wstore.GetClientId()
usageTelemetry := telemetry.IsTelemetryEnabled()
wcloud.SendDiagnosticPing(ctx, clientData.OID, usageTelemetry)
wcloud.SendDiagnosticPing(ctx, clientId, usageTelemetry)
return true
}

Expand Down Expand Up @@ -226,12 +221,8 @@ func sendTelemetryWrapper() {
ctx, cancelFn := context.WithTimeout(context.Background(), 15*time.Second)
defer cancelFn()
beforeSendActivityUpdate(ctx)
client, err := wstore.DBGetSingleton[*waveobj.Client](ctx)
if err != nil {
log.Printf("[error] getting client data for telemetry: %v\n", err)
return
}
err = wcloud.SendAllTelemetry(client.OID)
clientId := wstore.GetClientId()
err := wcloud.SendAllTelemetry(clientId)
if err != nil {
log.Printf("[error] sending telemetry: %v\n", err)
}
Expand Down Expand Up @@ -392,7 +383,10 @@ func createMainWshClient() {
wshfs.RpcClient = rpc
wshutil.DefaultRouter.RegisterTrustedLeaf(rpc, wshutil.DefaultRoute)
wps.Broker.SetClient(wshutil.DefaultRouter)
localConnWsh := wshutil.MakeWshRpc(wshrpc.RpcContext{Conn: wshrpc.LocalConnName}, wshremote.MakeRemoteRpcServerImpl(nil, wshutil.DefaultRouter, wshclient.GetBareRpcClient(), true), "conn:local")
localInitialEnv := envutil.PruneInitialEnv(envutil.SliceToMap(os.Environ()))
sockName := wavebase.GetDomainSocketName()
remoteImpl := wshremote.MakeRemoteRpcServerImpl(nil, wshutil.DefaultRouter, wshclient.GetBareRpcClient(), true, localInitialEnv, sockName)
localConnWsh := wshutil.MakeWshRpc(wshrpc.RpcContext{Conn: wshrpc.LocalConnName}, remoteImpl, "conn:local")
go wshremote.RunSysInfoLoop(localConnWsh, wshrpc.LocalConnName)
wshutil.DefaultRouter.RegisterTrustedLeaf(localConnWsh, wshutil.MakeConnectionRouteId(wshrpc.LocalConnName))
}
Expand Down
41 changes: 27 additions & 14 deletions cmd/wsh/cmd/wshcmd-connserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/wavetermdev/waveterm/pkg/baseds"
"github.com/wavetermdev/waveterm/pkg/panichandler"
"github.com/wavetermdev/waveterm/pkg/remote/fileshare/wshfs"
"github.com/wavetermdev/waveterm/pkg/util/envutil"
"github.com/wavetermdev/waveterm/pkg/util/packetparser"
"github.com/wavetermdev/waveterm/pkg/util/sigutil"
"github.com/wavetermdev/waveterm/pkg/wavebase"
Expand All @@ -42,6 +43,7 @@ var connServerRouterDomainSocket bool
var connServerConnName string
var connServerDev bool
var ConnServerWshRouter *wshutil.WshRouter
var connServerInitialEnv map[string]string

func init() {
serverCmd.Flags().BoolVar(&connServerRouter, "router", false, "run in local router mode (stdio upstream)")
Expand Down Expand Up @@ -120,18 +122,18 @@ func runListener(listener net.Listener, router *wshutil.WshRouter) {
}
}

func setupConnServerRpcClientWithRouter(router *wshutil.WshRouter) (*wshutil.WshRpc, error) {
func setupConnServerRpcClientWithRouter(router *wshutil.WshRouter, sockName string) (*wshutil.WshRpc, error) {
routeId := wshutil.MakeConnectionRouteId(connServerConnName)
rpcCtx := wshrpc.RpcContext{
RouteId: routeId,
Conn: connServerConnName,
}

bareRouteId := wshutil.MakeRandomProcRouteId()
bareClient := wshutil.MakeWshRpc(wshrpc.RpcContext{}, &wshclient.WshServer{}, bareRouteId)
router.RegisterTrustedLeaf(bareClient, bareRouteId)
connServerClient := wshutil.MakeWshRpc(rpcCtx, wshremote.MakeRemoteRpcServerImpl(os.Stdout, router, bareClient, false), routeId)

connServerClient := wshutil.MakeWshRpc(rpcCtx, wshremote.MakeRemoteRpcServerImpl(os.Stdout, router, bareClient, false, connServerInitialEnv, sockName), routeId)
router.RegisterTrustedLeaf(connServerClient, routeId)
return connServerClient, nil
}
Expand Down Expand Up @@ -170,8 +172,10 @@ func serverRunRouter() error {
}()
router.RegisterUpstream(termProxy)

sockName := getRemoteDomainSocketName()

// setup the connserver rpc client first
client, err := setupConnServerRpcClientWithRouter(router)
client, err := setupConnServerRpcClientWithRouter(router, sockName)
if err != nil {
return fmt.Errorf("error setting up connserver rpc client: %v", err)
}
Expand Down Expand Up @@ -267,23 +271,19 @@ func serverRunRouterDomainSocket(jwtToken string) error {
// register the domain socket connection as upstream
router.RegisterUpstream(upstreamProxy)

// setup the connserver rpc client (leaf)
client, err := setupConnServerRpcClientWithRouter(router)
if err != nil {
return fmt.Errorf("error setting up connserver rpc client: %v", err)
}
wshfs.RpcClient = client
// use the router's control RPC to authenticate with upstream
controlRpc := router.GetControlRpc()

// authenticate with the upstream router using the JWT
_, err = wshclient.AuthenticateCommand(client, jwtToken, &wshrpc.RpcOpts{Route: wshutil.ControlRoute})
_, err = wshclient.AuthenticateCommand(controlRpc, jwtToken, &wshrpc.RpcOpts{Route: wshutil.ControlRootRoute})
if err != nil {
return fmt.Errorf("error authenticating with upstream: %v", err)
}
log.Printf("authenticated with upstream router")

// fetch and set JWT public key
log.Printf("trying to get JWT public key")
jwtPublicKeyB64, err := wshclient.GetJwtPublicKeyCommand(client, nil)
jwtPublicKeyB64, err := wshclient.GetJwtPublicKeyCommand(controlRpc, nil)
if err != nil {
return fmt.Errorf("error getting jwt public key: %v", err)
}
Expand All @@ -297,6 +297,13 @@ func serverRunRouterDomainSocket(jwtToken string) error {
}
log.Printf("got JWT public key")

// now setup the connserver rpc client
client, err := setupConnServerRpcClientWithRouter(router, sockName)
if err != nil {
return fmt.Errorf("error setting up connserver rpc client: %v", err)
}
wshfs.RpcClient = client

// set up the local domain socket listener for local wsh commands
unixListener, err := MakeRemoteUnixListener()
if err != nil {
Expand All @@ -323,7 +330,11 @@ func serverRunRouterDomainSocket(jwtToken string) error {
}

func serverRunNormal(jwtToken string) error {
err := setupRpcClient(wshremote.MakeRemoteRpcServerImpl(os.Stdout, nil, nil, false), jwtToken)
sockName, err := wshutil.ExtractUnverifiedSocketName(jwtToken)
if err != nil {
return fmt.Errorf("error extracting socket name from JWT: %v", err)
}
err = setupRpcClient(wshremote.MakeRemoteRpcServerImpl(os.Stdout, nil, nil, false, connServerInitialEnv, sockName), jwtToken)
if err != nil {
return err
}
Expand Down Expand Up @@ -359,6 +370,8 @@ func askForJwtToken() (string, error) {
}

func serverRun(cmd *cobra.Command, args []string) error {
connServerInitialEnv = envutil.PruneInitialEnv(envutil.SliceToMap(os.Environ()))

var logFile *os.File
if connServerDev {
var err error
Expand Down
51 changes: 36 additions & 15 deletions cmd/wsh/cmd/wshcmd-jobdebug.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ var jobDebugPruneCmd = &cobra.Command{
RunE: jobDebugPruneRun,
}

var jobDebugExitCmd = &cobra.Command{
Use: "exit",
Short: "exit a job manager",
RunE: jobDebugExitRun,
var jobDebugTerminateCmd = &cobra.Command{
Use: "terminate",
Short: "terminate a job manager",
RunE: jobDebugTerminateRun,
}

var jobDebugDisconnectCmd = &cobra.Command{
Expand Down Expand Up @@ -96,7 +96,7 @@ var jobDebugDetachJobCmd = &cobra.Command{
var jobIdFlag string
var jobDebugJsonFlag bool
var jobConnFlag string
var exitJobIdFlag string
var terminateJobIdFlag string
var disconnectJobIdFlag string
var reconnectJobIdFlag string
var reconnectConnNameFlag string
Expand All @@ -110,7 +110,7 @@ func init() {
jobDebugCmd.AddCommand(jobDebugDeleteCmd)
jobDebugCmd.AddCommand(jobDebugDeleteAllCmd)
jobDebugCmd.AddCommand(jobDebugPruneCmd)
jobDebugCmd.AddCommand(jobDebugExitCmd)
jobDebugCmd.AddCommand(jobDebugTerminateCmd)
jobDebugCmd.AddCommand(jobDebugDisconnectCmd)
jobDebugCmd.AddCommand(jobDebugReconnectCmd)
jobDebugCmd.AddCommand(jobDebugReconnectConnCmd)
Expand All @@ -124,8 +124,8 @@ func init() {
jobDebugDeleteCmd.Flags().StringVar(&jobIdFlag, "jobid", "", "job id to delete (required)")
jobDebugDeleteCmd.MarkFlagRequired("jobid")

jobDebugExitCmd.Flags().StringVar(&exitJobIdFlag, "jobid", "", "job id to exit (required)")
jobDebugExitCmd.MarkFlagRequired("jobid")
jobDebugTerminateCmd.Flags().StringVar(&terminateJobIdFlag, "jobid", "", "job id to terminate (required)")
jobDebugTerminateCmd.MarkFlagRequired("jobid")

jobDebugDisconnectCmd.Flags().StringVar(&disconnectJobIdFlag, "jobid", "", "job id to disconnect (required)")
jobDebugDisconnectCmd.MarkFlagRequired("jobid")
Expand Down Expand Up @@ -176,12 +176,15 @@ func jobDebugListRun(cmd *cobra.Command, args []string) error {
return nil
}

fmt.Printf("%-36s %-20s %-9s %-10s %-30s %-8s %-10s\n", "OID", "Connection", "Connected", "Manager", "Cmd", "ExitCode", "Stream")
fmt.Printf("%-36s %-20s %-9s %-10s %-6s %-30s %-8s %-10s %-8s\n", "OID", "Connection", "Connected", "Manager", "Reason", "Cmd", "ExitCode", "Stream", "Attached")
for _, job := range rtnData {
connectedStatus := "no"
if connectedMap[job.OID] {
connectedStatus = "yes"
}
if job.TerminateOnReconnect {
connectedStatus += "*"
}

streamStatus := "-"
if job.StreamDone {
Expand All @@ -203,8 +206,26 @@ func jobDebugListRun(cmd *cobra.Command, args []string) error {
}
}

fmt.Printf("%-36s %-20s %-9s %-10s %-30s %-8s %-10s\n",
job.OID, job.Connection, connectedStatus, job.JobManagerStatus, job.Cmd, exitCode, streamStatus)
doneReason := "-"
if job.JobManagerDoneReason == "startuperror" {
doneReason = "serr"
} else if job.JobManagerDoneReason == "gone" {
doneReason = "gone"
} else if job.JobManagerDoneReason == "terminated" {
doneReason = "term"
}

attachedBlock := "-"
if job.AttachedBlockId != "" {
if len(job.AttachedBlockId) >= 8 {
attachedBlock = job.AttachedBlockId[:8]
} else {
attachedBlock = job.AttachedBlockId
}
}

fmt.Printf("%-36s %-20s %-9s %-10s %-6s %-30s %-8s %-10s %-8s\n",
job.OID, job.Connection, connectedStatus, job.JobManagerStatus, doneReason, job.Cmd, exitCode, streamStatus, attachedBlock)
}
return nil
}
Expand Down Expand Up @@ -275,13 +296,13 @@ func jobDebugPruneRun(cmd *cobra.Command, args []string) error {
return nil
}

func jobDebugExitRun(cmd *cobra.Command, args []string) error {
err := wshclient.JobControllerExitJobCommand(RpcClient, exitJobIdFlag, nil)
func jobDebugTerminateRun(cmd *cobra.Command, args []string) error {
err := wshclient.JobControllerExitJobCommand(RpcClient, terminateJobIdFlag, nil)
if err != nil {
return fmt.Errorf("exiting job manager: %w", err)
return fmt.Errorf("terminating job manager: %w", err)
}

fmt.Printf("Job manager for %s exited successfully\n", exitJobIdFlag)
fmt.Printf("Job manager for %s terminated successfully\n", terminateJobIdFlag)
return nil
}

Expand Down
15 changes: 10 additions & 5 deletions frontend/app/store/wshclientapi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ class RpcApiType {
return client.wshRpcCall("connreinstallwsh", data, opts);
}

// command "connserverinit" [call]
ConnServerInitCommand(client: WshClient, data: CommandConnServerInitData, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("connserverinit", data, opts);
}

// command "connstatus" [call]
ConnStatusCommand(client: WshClient, opts?: RpcOpts): Promise<ConnStatus[]> {
return client.wshRpcCall("connstatus", null, opts);
Expand All @@ -112,6 +117,11 @@ class RpcApiType {
return client.wshRpcCall("controllerappendoutput", data, opts);
}

// command "controllerdestroy" [call]
ControllerDestroyCommand(client: WshClient, data: string, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("controllerdestroy", data, opts);
}

// command "controllerinput" [call]
ControllerInputCommand(client: WshClient, data: CommandBlockInputData, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("controllerinput", data, opts);
Expand All @@ -122,11 +132,6 @@ class RpcApiType {
return client.wshRpcCall("controllerresync", data, opts);
}

// command "controllerstop" [call]
ControllerStopCommand(client: WshClient, data: string, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("controllerstop", data, opts);
}

// command "createblock" [call]
CreateBlockCommand(client: WshClient, data: CommandCreateBlockData, opts?: RpcOpts): Promise<ORef> {
return client.wshRpcCall("createblock", data, opts);
Expand Down
1 change: 0 additions & 1 deletion frontend/app/view/term/term.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,6 @@ const TerminalView = ({ blockId, model }: ViewComponentProps<TermViewModel>) =>
useWebGl: !termSettings?.["term:disablewebgl"],
sendDataHandler: model.sendDataToController.bind(model),
nodeModel: model.nodeModel,
jobId: blockData?.jobid,
}
);
(window as any).term = termWrap;
Expand Down
5 changes: 1 addition & 4 deletions frontend/app/view/term/termwrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ type TermWrapOptions = {
useWebGl?: boolean;
sendDataHandler?: (data: string) => void;
nodeModel?: BlockNodeModel;
jobId?: string;
};

// for xterm OSC handlers, we return true always because we "own" the OSC number.
Expand Down Expand Up @@ -375,7 +374,6 @@ function handleOsc16162Command(data: string, blockId: string, loaded: boolean, t
export class TermWrap {
tabId: string;
blockId: string;
jobId: string;
ptyOffset: number;
dataBytesProcessed: number;
terminal: Terminal;
Expand Down Expand Up @@ -423,7 +421,6 @@ export class TermWrap {
this.loaded = false;
this.tabId = tabId;
this.blockId = blockId;
this.jobId = waveOptions.jobId;
this.sendDataHandler = waveOptions.sendDataHandler;
this.nodeModel = waveOptions.nodeModel;
this.ptyOffset = 0;
Expand Down Expand Up @@ -498,7 +495,7 @@ export class TermWrap {
}

getZoneId(): string {
return this.jobId ?? this.blockId;
return this.blockId;
}

resetCompositionState() {
Expand Down
10 changes: 5 additions & 5 deletions frontend/app/view/tsunami/tsunami.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ class TsunamiViewModel extends WebViewModel {
this.doControllerResync(false, "resync", false);
}

stopController() {
const prtn = RpcApi.ControllerStopCommand(TabRpcClient, this.blockId);
prtn.catch((e) => console.log("error stopping controller", e));
destroyController() {
const prtn = RpcApi.ControllerDestroyCommand(TabRpcClient, this.blockId);
prtn.catch((e) => console.log("error destroying controller", e));
}

async restartController() {
Expand All @@ -130,7 +130,7 @@ class TsunamiViewModel extends WebViewModel {
this.triggerRestartAtom();
try {
// Stop the controller first
await RpcApi.ControllerStopCommand(TabRpcClient, this.blockId);
await RpcApi.ControllerDestroyCommand(TabRpcClient, this.blockId);
// Wait a bit for the controller to fully stop
await new Promise((resolve) => setTimeout(resolve, 300));
// Then resync to restart it
Expand Down Expand Up @@ -202,7 +202,7 @@ class TsunamiViewModel extends WebViewModel {
const tsunamiItems: ContextMenuItem[] = [
{
label: "Stop WaveApp",
click: () => this.stopController(),
click: () => this.destroyController(),
},
{
label: "Restart WaveApp",
Expand Down
Loading
Loading