-
Notifications
You must be signed in to change notification settings - Fork 110
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RSDK-8982, RSDK-9167, RSDK-8979] - Add SetStreamOptions to stream server #4530
base: main
Are you sure you want to change the base?
Conversation
robot/web/stream/server.go
Outdated
@@ -336,6 +336,8 @@ func (server *Server) GetStreamOptions( | |||
ctx context.Context, | |||
req *streampb.GetStreamOptionsRequest, | |||
) (*streampb.GetStreamOptionsResponse, error) { | |||
server.mu.RLock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to add this read lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mutex is protecting these struct members:
nameToStreamState map[string]*state.StreamState
activePeerStreams map[*webrtc.PeerConnection]map[string]*peerState
activeBackgroundWorkers sync.WaitGroup
isAlive bool
I don't see them being accessed so I don't understand why this mutex is being taken
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call - I am just pulling the camera from robot not accessing any of these. Removed.
robot/web/stream/server.go
Outdated
if req.Name == "" { | ||
return nil, errors.New("stream name is required in request") | ||
} | ||
if req.Resolution == nil { | ||
return nil, fmt.Errorf("resolution is required to resize stream %q", req.Name) | ||
} | ||
if req.Resolution.Width <= 0 || req.Resolution.Height <= 0 { | ||
return nil, fmt.Errorf( | ||
"invalid resolution to resize stream %q: width (%d) and height (%d) must be greater than 0", | ||
req.Name, req.Resolution.Width, req.Resolution.Height, | ||
) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets move this into a validation function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good - moved to helper.
robot/web/stream/server.go
Outdated
if err != nil { | ||
return nil, fmt.Errorf("failed to resize video source for stream %q: %w", req.Name, err) | ||
} | ||
streamState, ok := server.nameToStreamState[req.Name] | ||
if !ok { | ||
return nil, fmt.Errorf("stream state not found with name %q", req.Name) | ||
} | ||
err = streamState.Resize() | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to resize stream %q: %w", req.Name, err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we move this logic into resizeVideoSource
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep -- moved all resize logic into resizeVideoSource
.
@@ -185,6 +200,10 @@ func (state *StreamState) sourceEventHandler() { | |||
if state.activeClients == 0 { | |||
state.tick() | |||
} | |||
case msgTypeResize: | |||
state.logger.Debug("resize event received") | |||
state.isResized = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we ever go back to not resized?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for bringing this up -- the Reset path has been implemented to handle getting out of resized mode.
t.Run("when in rtppassthrough mode and a resize occurs test downgrade path to gostream", func(t *testing.T) { | ||
var startCount atomic.Int64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A lot of your tests are time based and waiting for assertions, can you run with -race
and -failfast
in canon to verify that they're not flaky.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
go test -timeout 30s -race -failfast -run ^TestSetStreamOptions$ go.viam.com/rdk/robot/web/stream
ok go.viam.com/rdk/robot/web/stream 3.255s
go test -timeout 30s -race -failfast -run ^TestStreamState$/^when_in_rtppassthrough_mode_and_a_resize_occurs_test_downgrade_path_to_gostream$ go.viam.com/rdk/robot/web/stream/state
ok go.viam.com/rdk/robot/web/stream/state 3.056s
@@ -271,7 +302,7 @@ func (state *StreamState) tick() { | |||
case state.streamSource == streamSourcePassthrough: | |||
// no op if we are using passthrough & are healthy | |||
state.logger.Debug("still healthy and using h264 passthrough") | |||
case state.streamSource == streamSourceGoStream: | |||
case state.streamSource == streamSourceGoStream && !state.isResized: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does a user do to hit this state after they've selected a resolution in the UI?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented the Reset
path for this. Not exactly sure how to handle this from a UI perspective -- may need to update the wire-diagram in the scope doc to include a Reset button.
robot/web/stream/server.go
Outdated
|
||
// validateSetStreamOptionsRequest validates the request to set the stream options. | ||
func validateSetStreamOptionsRequest(req *streampb.SetStreamOptionsRequest) (int, error) { | ||
if req.Name == "" { | ||
return optionsCommandUnknown, errors.New("stream name is required in request") | ||
} | ||
if req.Resolution == nil { | ||
return optionsCommandReset, nil | ||
} | ||
if req.Resolution.Width <= 0 || req.Resolution.Height <= 0 { | ||
return optionsCommandUnknown, | ||
fmt.Errorf( | ||
"invalid resolution to resize stream %q: width (%d) and height (%d) must be greater than 0", | ||
req.Name, req.Resolution.Width, req.Resolution.Height, | ||
) | ||
} | ||
return optionsCommandResize, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: put this above where it's used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, moved.
Description
RSDK-8982, RSDK-9167 RSDK-8979
This PR adds the
SetStreamOptions
endpoint to the stream server.NewResizeVideoSource
.Resize
msg to thestreamState
handler.Increment
andDecrement
calls.Reset
msg to thestreamState
handler.In order to unify RTP
Sequence Number
in the header between GoStream and Passthrough paths we now override the Sequence number in theWriteRTP
handler.Tests
NewStreamServiceClient
script as seen below):Demos
Screen.Recording.2024-11-12.at.2.05.18.PM.mov
passthrough_gostream_handoff.mov
TODO:
Test Script