[Fuchsia] Implement timeout for Cast Streaming

* Kill the Session after 10s of no activity in the audio or video
  stream.
* Kill the Session after 5s if no offer message is received.
* Fix reporting of messages from unknown namespaces in the Cast
  Streaming MessagePort.

Bug: 1087528, 1134441
Change-Id: I69b44bea3b142e7cf12c32106a279ef37710c56b
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2451525
Reviewed-by: Kevin Marshall <[email protected]>
Commit-Queue: Fabrice de Gans-Riberi <[email protected]>
Cr-Commit-Position: refs/heads/master@{#815876}
(cherry picked from commit 274ebeda8528cb28b23a3a54a50bd57d185d25fd)

[email protected]

Change-Id: Ie94bcff8ea2417205cd09a1d248d038055700d7c
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2466496
Reviewed-by: Fabrice de Gans-Riberi <[email protected]>
Commit-Queue: Fabrice de Gans-Riberi <[email protected]>
Cr-Commit-Position: refs/branch-heads/4280@{#257}
Cr-Branched-From: ea420fb963f9658c9969b6513c56b8f47efa1a2a-refs/heads/master@{#812852}
diff --git a/fuchsia/cast_streaming/cast_message_port_impl.cc b/fuchsia/cast_streaming/cast_message_port_impl.cc
index 906f305..e362fe7 100644
--- a/fuchsia/cast_streaming/cast_message_port_impl.cc
+++ b/fuchsia/cast_streaming/cast_message_port_impl.cc
@@ -267,7 +267,7 @@
     client_->OnMessage(sender_id, message_namespace, str_message);
   } else if (message_namespace == kInjectNamespace) {
     SendInjectResponse(sender_id, str_message);
-  } else if (message_namespace == kSystemNamespace) {
+  } else if (message_namespace != kSystemNamespace) {
     // System messages are ignored, log messages from unknown namespaces.
     DVLOG(2) << "Unknown message from " << sender_id
              << ", namespace=" << message_namespace
diff --git a/fuchsia/cast_streaming/cast_streaming_session.cc b/fuchsia/cast_streaming/cast_streaming_session.cc
index 3e5d348a..3561e302 100644
--- a/fuchsia/cast_streaming/cast_streaming_session.cc
+++ b/fuchsia/cast_streaming/cast_streaming_session.cc
@@ -8,6 +8,7 @@
 
 #include "base/bind.h"
 #include "base/notreached.h"
+#include "base/timer/timer.h"
 #include "components/openscreen_platform/network_context.h"
 #include "components/openscreen_platform/network_util.h"
 #include "components/openscreen_platform/task_runner.h"
@@ -26,6 +27,9 @@
 constexpr char kVideoCodecH264[] = "h264";
 constexpr char kVideoCodecVp8[] = "vp8";
 
+// Timeout to end the Session when no offer message is sent.
+constexpr base::TimeDelta kInitTimeout = base::TimeDelta::FromSeconds(5);
+
 }  // namespace
 
 namespace cast_streaming {
@@ -48,19 +52,24 @@
       : task_runner_(task_runner),
         environment_(&openscreen::Clock::now, &task_runner_),
         cast_message_port_impl_(std::move(message_port_request)),
-        // TODO(crbug.com/1087520): Add streaming session Constraints and
-        // DisplayDescription.
-        receiver_session_(this,
-                          &environment_,
-                          &cast_message_port_impl_,
-                          openscreen::cast::ReceiverSession::Preferences(
-                              {openscreen::cast::VideoCodec::kH264,
-                               openscreen::cast::VideoCodec::kVp8},
-                              {openscreen::cast::AudioCodec::kAac,
-                               openscreen::cast::AudioCodec::kOpus})),
         client_(client) {
     DCHECK(task_runner);
     DCHECK(client_);
+
+    // TODO(crbug.com/1087520): Add streaming session Constraints and
+    // DisplayDescription.
+    receiver_session_ = std::make_unique<openscreen::cast::ReceiverSession>(
+        this, &environment_, &cast_message_port_impl_,
+        openscreen::cast::ReceiverSession::Preferences(
+            {openscreen::cast::VideoCodec::kH264,
+             openscreen::cast::VideoCodec::kVp8},
+            {openscreen::cast::AudioCodec::kAac,
+             openscreen::cast::AudioCodec::kOpus}));
+
+    init_timeout_timer_.Start(
+        FROM_HERE, kInitTimeout,
+        base::BindOnce(&CastStreamingSession::Internal::OnInitializationTimeout,
+                       base::Unretained(this)));
   }
 
   ~Internal() final = default;
@@ -69,14 +78,22 @@
   Internal& operator=(const Internal&) = delete;
 
  private:
+  void OnInitializationTimeout() {
+    DVLOG(1) << __func__;
+    DCHECK(!is_initialized_);
+    client_->OnInitializationFailure();
+    is_initialized_ = true;
+  }
+
   // openscreen::cast::ReceiverSession::Client implementation.
   void OnNegotiated(
       const openscreen::cast::ReceiverSession* session,
       openscreen::cast::ReceiverSession::ConfiguredReceivers receivers) final {
     DVLOG(1) << __func__;
-    DCHECK_EQ(session, &receiver_session_);
+    DCHECK_EQ(session, receiver_session_.get());
+    init_timeout_timer_.Stop();
 
-    if (initialized_called_) {
+    if (is_initialized_) {
       // TODO(crbug.com/1116185): Handle multiple offer messages properly.
       return;
     }
@@ -99,11 +116,15 @@
       }
 
       // Initialize the audio consumer.
+      // We can use unretained pointers here because StreamConsumer is owned by
+      // this object and |client_| is guaranteed to outlive this object.
       audio_consumer_ = std::make_unique<StreamConsumer>(
           receivers.audio->receiver, std::move(data_pipe_producer),
           base::BindRepeating(
               &CastStreamingSession::Client::OnAudioBufferReceived,
-              base::Unretained(client_)));
+              base::Unretained(client_)),
+          base::BindOnce(&CastStreamingSession::Internal::OnDataTimeout,
+                         base::Unretained(this)));
 
       // Gather data for the audio decoder config.
       media::ChannelLayout channel_layout =
@@ -143,11 +164,15 @@
       }
 
       // Initialize the video consumer.
+      // We can use unretained pointers here because StreamConsumer is owned by
+      // this object and |client_| is guaranteed to outlive this object.
       video_consumer_ = std::make_unique<StreamConsumer>(
           receivers.video->receiver, std::move(data_pipe_producer),
           base::BindRepeating(
               &CastStreamingSession::Client::OnVideoBufferReceived,
-              base::Unretained(client_)));
+              base::Unretained(client_)),
+          base::BindOnce(&CastStreamingSession::Internal::OnDataTimeout,
+                         base::Unretained(this)));
 
       // Gather data for the video decoder config.
       const std::string& video_codec =
@@ -193,14 +218,15 @@
       client_->OnInitializationSuccess(std::move(audio_stream_info),
                                        std::move(video_stream_info));
     }
-    initialized_called_ = true;
+    is_initialized_ = true;
   }
 
   // TODO(https://crbug.com/1116185): Handle |reason| and reset streams on a
   // new offer message.
   void OnReceiversDestroying(const openscreen::cast::ReceiverSession* session,
                              ReceiversDestroyingReason reason) final {
-    DCHECK_EQ(session, &receiver_session_);
+    // This can be called when |receiver_session_| is being destroyed, so we
+    // do not sanity-check |session| here.
     DVLOG(1) << __func__;
     audio_consumer_.reset();
     video_consumer_.reset();
@@ -209,20 +235,26 @@
 
   void OnError(const openscreen::cast::ReceiverSession* session,
                openscreen::Error error) final {
-    DCHECK_EQ(session, &receiver_session_);
+    DCHECK_EQ(session, receiver_session_.get());
     LOG(ERROR) << error;
-    if (!initialized_called_) {
+    if (!is_initialized_) {
       client_->OnInitializationFailure();
-      initialized_called_ = true;
+      is_initialized_ = true;
     }
   }
 
+  void OnDataTimeout() {
+    DVLOG(1) << __func__;
+    receiver_session_.reset();
+  }
+
   openscreen_platform::TaskRunner task_runner_;
   openscreen::cast::Environment environment_;
   CastMessagePortImpl cast_message_port_impl_;
-  openscreen::cast::ReceiverSession receiver_session_;
+  std::unique_ptr<openscreen::cast::ReceiverSession> receiver_session_;
+  base::OneShotTimer init_timeout_timer_;
 
-  bool initialized_called_ = false;
+  bool is_initialized_ = false;
   CastStreamingSession::Client* const client_;
   std::unique_ptr<openscreen::cast::Receiver::Consumer> audio_consumer_;
   std::unique_ptr<openscreen::cast::Receiver::Consumer> video_consumer_;
diff --git a/fuchsia/cast_streaming/stream_consumer.cc b/fuchsia/cast_streaming/stream_consumer.cc
index 353ac52..ff2a53f 100644
--- a/fuchsia/cast_streaming/stream_consumer.cc
+++ b/fuchsia/cast_streaming/stream_consumer.cc
@@ -10,9 +10,17 @@
 
 namespace cast_streaming {
 
+namespace {
+
+// Timeout to stop the Session when no data is received.
+constexpr base::TimeDelta kNoDataTimeout = base::TimeDelta::FromSeconds(10);
+
+}  // namespace
+
 StreamConsumer::StreamConsumer(openscreen::cast::Receiver* receiver,
                                mojo::ScopedDataPipeProducerHandle data_pipe,
-                               FrameReceivedCB frame_received_cb)
+                               FrameReceivedCB frame_received_cb,
+                               base::OnceClosure on_timeout)
     : receiver_(receiver),
       data_pipe_(std::move(data_pipe)),
       frame_received_cb_(std::move(frame_received_cb)),
@@ -27,7 +35,10 @@
                                               base::Unretained(this)));
   if (result != MOJO_RESULT_OK) {
     CloseDataPipeOnError();
+    return;
   }
+
+  data_timeout_timer_.Start(FROM_HERE, kNoDataTimeout, std::move(on_timeout));
 }
 
 StreamConsumer::~StreamConsumer() {
@@ -39,6 +50,7 @@
   receiver_->SetConsumer(nullptr);
   pipe_watcher_.Cancel();
   data_pipe_.reset();
+  data_timeout_timer_.Stop();
 }
 
 void StreamConsumer::OnPipeWritable(MojoResult result) {
@@ -72,6 +84,7 @@
 
 void StreamConsumer::OnFramesReady(int next_frame_buffer_size) {
   DCHECK(data_pipe_);
+  data_timeout_timer_.Reset();
 
   if (pending_buffer_remaining_bytes_ != 0) {
     // There already is a pending frame. Ignore this one for now.
diff --git a/fuchsia/cast_streaming/stream_consumer.h b/fuchsia/cast_streaming/stream_consumer.h
index eef5f14b..0fc3ffd 100644
--- a/fuchsia/cast_streaming/stream_consumer.h
+++ b/fuchsia/cast_streaming/stream_consumer.h
@@ -8,6 +8,7 @@
 #include <fuchsia/media/cpp/fidl.h>
 
 #include "base/callback.h"
+#include "base/timer/timer.h"
 #include "media/mojo/mojom/media_types.mojom.h"
 #include "mojo/public/cpp/system/data_pipe.h"
 #include "mojo/public/cpp/system/simple_watcher.h"
@@ -33,9 +34,11 @@
   // |receiver| sends frames to this object. It must outlive this object.
   // |frame_received_cb| is called on every new frame, after a new frame has
   // been written to |data_pipe|. On error, |data_pipe| will be closed.
+  // If no data is received for 10 seconds, |on_timeout| will be closed.
   StreamConsumer(openscreen::cast::Receiver* receiver,
                  mojo::ScopedDataPipeProducerHandle data_pipe,
-                 FrameReceivedCB frame_received_cb);
+                 FrameReceivedCB frame_received_cb,
+                 base::OnceClosure on_timeout);
   ~StreamConsumer() final;
 
   StreamConsumer(const StreamConsumer&) = delete;
@@ -70,6 +73,9 @@
 
   // Remaining bytes to write from |pending_buffer_| to |data_pipe_|.
   size_t pending_buffer_remaining_bytes_ = 0;
+
+  // Timer to trigger connection closure if no data is received for 10 seconds.
+  base::OneShotTimer data_timeout_timer_;
 };
 
 }  // namespace cast_streaming
diff --git a/fuchsia/engine/browser/cast_streaming_session_client.cc b/fuchsia/engine/browser/cast_streaming_session_client.cc
index 5b022f9..5c0508a 100644
--- a/fuchsia/engine/browser/cast_streaming_session_client.cc
+++ b/fuchsia/engine/browser/cast_streaming_session_client.cc
@@ -89,6 +89,13 @@
 
   // Tear down the Mojo connection.
   cast_streaming_receiver_.reset();
+
+  // Tear down all remaining Mojo objects if needed. This is necessary if the
+  // Cast Streaming Session ending was initiated by the receiver component.
+  if (audio_remote_)
+    audio_remote_.reset();
+  if (video_remote_)
+    video_remote_.reset();
 }
 
 void CastStreamingSessionClient::OnMojoDisconnect() {
diff --git a/fuchsia/engine/renderer/cast_streaming_demuxer.cc b/fuchsia/engine/renderer/cast_streaming_demuxer.cc
index dc8413ca..2f7bc6de 100644
--- a/fuchsia/engine/renderer/cast_streaming_demuxer.cc
+++ b/fuchsia/engine/renderer/cast_streaming_demuxer.cc
@@ -63,7 +63,7 @@
       return;
 
     if (current_buffer_->end_of_stream()) {
-      std::move(pending_read_cb_).Run(Status::kAborted, nullptr);
+      std::move(pending_read_cb_).Run(Status::kError, nullptr);
       return;
     }
 
@@ -198,7 +198,6 @@
 
 CastStreamingDemuxer::~CastStreamingDemuxer() {
   DVLOG(1) << __func__;
-  DCHECK(media_task_runner_->BelongsToCurrentThread());
 
   if (was_initialization_successful_) {
     original_task_runner_->PostTask(
diff --git a/fuchsia/runners/cast/data/receiver.html b/fuchsia/runners/cast/data/receiver.html
index 29a914e1..faf7eb4 100644
--- a/fuchsia/runners/cast/data/receiver.html
+++ b/fuchsia/runners/cast/data/receiver.html
@@ -17,7 +17,7 @@
 </head>
 
 <body>
-  <video src="data:cast_streaming_receiver" autoplay>
+  <video src="data:cast_streaming_receiver">
 
   <script>
     // The Cast Streaming session must stop when the stream is no longer visible. crbug.com/1111886
@@ -27,10 +27,10 @@
       }
     });
 
-    // TODO(crbug.com/1087528): This should not be necessary. Figure out why
-    // autoplay is not enough here.
     var video = document.querySelector('video');
-    video.play();
+    video.addEventListener('ended', window.close);
+    video.addEventListener('error', window.close);
+    video.play().catch(window.close);
   </script>
 </body>
 </html>