You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							336 lines
						
					
					
						
							11 KiB
						
					
					
				
			
		
		
	
	
							336 lines
						
					
					
						
							11 KiB
						
					
					
				| #include "tools/replay/replay.h"
 | |
| 
 | |
| #include <capnp/dynamic.h>
 | |
| #include <csignal>
 | |
| #include "cereal/services.h"
 | |
| #include "common/params.h"
 | |
| #include "tools/replay/util.h"
 | |
| 
 | |
| static void interrupt_sleep_handler(int signal) {}
 | |
| 
 | |
| // Helper function to notify events with safety checks
 | |
| template <typename Callback, typename... Args>
 | |
| void notifyEvent(Callback &callback, Args &&...args) {
 | |
|   if (callback) callback(std::forward<Args>(args)...);
 | |
| }
 | |
| 
 | |
| Replay::Replay(const std::string &route, std::vector<std::string> allow, std::vector<std::string> block,
 | |
|                SubMaster *sm, uint32_t flags, const std::string &data_dir, bool auto_source)
 | |
|     : sm_(sm), flags_(flags), seg_mgr_(std::make_unique<SegmentManager>(route, flags, data_dir, auto_source)) {
 | |
|   std::signal(SIGUSR1, interrupt_sleep_handler);
 | |
| 
 | |
|   if (!(flags_ & REPLAY_FLAG_ALL_SERVICES)) {
 | |
|     block.insert(block.end(), {"uiDebug", "userFlag"});
 | |
|   }
 | |
|   setupServices(allow, block);
 | |
|   setupSegmentManager(!allow.empty() || !block.empty());
 | |
| }
 | |
| 
 | |
| void Replay::setupServices(const std::vector<std::string> &allow, const std::vector<std::string> &block) {
 | |
|   auto event_schema = capnp::Schema::from<cereal::Event>().asStruct();
 | |
|   sockets_.resize(event_schema.getUnionFields().size(), nullptr);
 | |
| 
 | |
|   std::vector<const char *> active_services;
 | |
|   for (const auto &[name, _] : services) {
 | |
|     bool is_blocked = std::find(block.begin(), block.end(), name) != block.end();
 | |
|     bool is_allowed = allow.empty() || std::find(allow.begin(), allow.end(), name) != allow.end();
 | |
|     if (is_allowed && !is_blocked) {
 | |
|       uint16_t which = event_schema.getFieldByName(name).getProto().getDiscriminantValue();
 | |
|       sockets_[which] = name.c_str();
 | |
|       active_services.push_back(name.c_str());
 | |
|     }
 | |
|   }
 | |
|   rInfo("active services: %s", join(active_services, ", ").c_str());
 | |
|   if (!sm_) {
 | |
|     pm_ = std::make_unique<PubMaster>(active_services);
 | |
|   }
 | |
| }
 | |
| 
 | |
| void Replay::setupSegmentManager(bool has_filters) {
 | |
|   seg_mgr_->setCallback([this]() { handleSegmentMerge(); });
 | |
| 
 | |
|   if (has_filters) {
 | |
|     std::vector<bool> filters(sockets_.size(), false);
 | |
|     for (size_t i = 0; i < sockets_.size(); ++i) {
 | |
|       filters[i] = (i == cereal::Event::Which::INIT_DATA || i == cereal::Event::Which::CAR_PARAMS || sockets_[i]);
 | |
|     }
 | |
|     seg_mgr_->setFilters(filters);
 | |
|   }
 | |
| }
 | |
| 
 | |
| Replay::~Replay() {
 | |
|   seg_mgr_.reset();
 | |
|   if (stream_thread_.joinable()) {
 | |
|     rInfo("shutdown: in progress...");
 | |
|     interruptStream([this]() {
 | |
|       exit_ = true;
 | |
|       return false;
 | |
|     });
 | |
|     stream_thread_.join();
 | |
|     rInfo("shutdown: done");
 | |
|   }
 | |
|   camera_server_.reset();
 | |
| }
 | |
| 
 | |
| bool Replay::load() {
 | |
|   rInfo("loading route %s", seg_mgr_->route_.name().c_str());
 | |
|   if (!seg_mgr_->load()) return false;
 | |
| 
 | |
|   min_seconds_ = seg_mgr_->route_.segments().begin()->first * 60;
 | |
|   max_seconds_ = (seg_mgr_->route_.segments().rbegin()->first + 1) * 60;
 | |
|   return true;
 | |
| }
 | |
| 
 | |
| void Replay::interruptStream(const std::function<bool()> &update_fn) {
 | |
|   if (stream_thread_.joinable() && stream_thread_id) {
 | |
|     pthread_kill(stream_thread_id, SIGUSR1);  // Interrupt sleep in stream thread
 | |
|   }
 | |
|   {
 | |
|     interrupt_requested_ = true;
 | |
|     std::unique_lock lock(stream_lock_);
 | |
|     events_ready_ = update_fn();
 | |
|     interrupt_requested_ = user_paused_;
 | |
|   }
 | |
|   stream_cv_.notify_one();
 | |
| }
 | |
| 
 | |
| void Replay::seekTo(double seconds, bool relative) {
 | |
|   double target_time = relative ? seconds + currentSeconds() : seconds;
 | |
|   target_time = std::max(0.0, target_time);
 | |
|   int target_segment = target_time / 60;
 | |
|   if (!seg_mgr_->hasSegment(target_segment)) {
 | |
|     rWarning("Invalid seek to %.2f s (segment %d)", target_time, target_segment);
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   rInfo("Seeking to %d s, segment %d", (int)target_time, target_segment);
 | |
|   notifyEvent(onSeeking, target_time);
 | |
| 
 | |
|   interruptStream([&]() {
 | |
|     current_segment_.store(target_segment);
 | |
|     cur_mono_time_ = route_start_ts_ + target_time * 1e9;
 | |
|     cur_which_ = cereal::Event::Which::INIT_DATA;
 | |
|     seeking_to_.store(target_time, std::memory_order_relaxed);
 | |
|     return false;
 | |
|   });
 | |
| 
 | |
|   seg_mgr_->setCurrentSegment(target_segment);
 | |
|   checkSeekProgress();
 | |
| }
 | |
| 
 | |
| void Replay::checkSeekProgress() {
 | |
|   if (!seg_mgr_->getEventData()->isSegmentLoaded(current_segment_.load())) return;
 | |
| 
 | |
|   double seek_to = seeking_to_.exchange(-1.0, std::memory_order_acquire);
 | |
|   if (seek_to >= 0 && onSeekedTo) {
 | |
|     onSeekedTo(seek_to);
 | |
|   }
 | |
| 
 | |
|   // Resume the interrupted stream
 | |
|   interruptStream([]() { return true; });
 | |
| }
 | |
| 
 | |
| void Replay::seekToFlag(FindFlag flag) {
 | |
|   if (auto next = timeline_.find(currentSeconds(), flag)) {
 | |
|     seekTo(*next - 2, false);  // seek to 2 seconds before next
 | |
|   }
 | |
| }
 | |
| 
 | |
| void Replay::pause(bool pause) {
 | |
|   if (user_paused_ != pause) {
 | |
|     interruptStream([=]() {
 | |
|       rWarning("%s at %.2f s", pause ? "paused..." : "resuming", currentSeconds());
 | |
|       user_paused_ = pause;
 | |
|       return !pause;
 | |
|     });
 | |
|   }
 | |
| }
 | |
| 
 | |
| void Replay::handleSegmentMerge() {
 | |
|   if (exit_) return;
 | |
| 
 | |
|   auto event_data = seg_mgr_->getEventData();
 | |
|   if (!stream_thread_.joinable() && !event_data->segments.empty()) {
 | |
|     startStream(event_data->segments.begin()->second);
 | |
|   }
 | |
|   notifyEvent(onSegmentsMerged);
 | |
| 
 | |
|   // Interrupt the stream to handle segment merge
 | |
|   interruptStream([]() { return false; });
 | |
|   checkSeekProgress();
 | |
| }
 | |
| 
 | |
| void Replay::startStream(const std::shared_ptr<Segment> segment) {
 | |
|   const auto &events = segment->log->events;
 | |
|   route_start_ts_ = events.front().mono_time;
 | |
|   cur_mono_time_ += route_start_ts_ - 1;
 | |
| 
 | |
|   // get datetime from INIT_DATA, fallback to datetime in the route name
 | |
|   route_date_time_ = route().datetime();
 | |
|   auto it = std::find_if(events.cbegin(), events.cend(),
 | |
|                          [](const Event &e) { return e.which == cereal::Event::Which::INIT_DATA; });
 | |
|   if (it != events.cend()) {
 | |
|     capnp::FlatArrayMessageReader reader(it->data);
 | |
|     auto event = reader.getRoot<cereal::Event>();
 | |
|     uint64_t wall_time = event.getInitData().getWallTimeNanos();
 | |
|     if (wall_time > 0) {
 | |
|       route_date_time_ = wall_time / 1e6;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   // write CarParams
 | |
|   it = std::find_if(events.begin(), events.end(), [](const Event &e) { return e.which == cereal::Event::Which::CAR_PARAMS; });
 | |
|   if (it != events.end()) {
 | |
|     capnp::FlatArrayMessageReader reader(it->data);
 | |
|     auto event = reader.getRoot<cereal::Event>();
 | |
|     car_fingerprint_ = event.getCarParams().getCarFingerprint();
 | |
| 
 | |
|     capnp::MallocMessageBuilder builder;
 | |
|     builder.setRoot(event.getCarParams());
 | |
|     auto words = capnp::messageToFlatArray(builder);
 | |
|     auto bytes = words.asBytes();
 | |
|     Params().put("CarParams", (const char *)bytes.begin(), bytes.size());
 | |
|     Params().put("CarParamsPersistent", (const char *)bytes.begin(), bytes.size());
 | |
|   } else {
 | |
|     rWarning("failed to read CarParams from current segment");
 | |
|   }
 | |
| 
 | |
|   // start camera server
 | |
|   if (!hasFlag(REPLAY_FLAG_NO_VIPC)) {
 | |
|     std::pair<int, int> camera_size[MAX_CAMERAS] = {};
 | |
|     for (auto type : ALL_CAMERAS) {
 | |
|       if (auto &fr = segment->frames[type]) {
 | |
|         camera_size[type] = {fr->width, fr->height};
 | |
|       }
 | |
|     }
 | |
|     camera_server_ = std::make_unique<CameraServer>(camera_size);
 | |
|   }
 | |
| 
 | |
|   timeline_.initialize(seg_mgr_->route_, route_start_ts_, !(flags_ & REPLAY_FLAG_NO_FILE_CACHE),
 | |
|                        [this](std::shared_ptr<LogReader> log) { notifyEvent(onQLogLoaded, log); });
 | |
| 
 | |
|   stream_thread_ = std::thread(&Replay::streamThread, this);
 | |
| }
 | |
| 
 | |
| void Replay::publishMessage(const Event *e) {
 | |
|   if (event_filter_ && event_filter_(e)) return;
 | |
| 
 | |
|   if (!sm_) {
 | |
|     auto bytes = e->data.asBytes();
 | |
|     int ret = pm_->send(sockets_[e->which], (capnp::byte *)bytes.begin(), bytes.size());
 | |
|     if (ret == -1) {
 | |
|       rWarning("stop publishing %s due to multiple publishers error", sockets_[e->which]);
 | |
|       sockets_[e->which] = nullptr;
 | |
|     }
 | |
|   } else {
 | |
|     capnp::FlatArrayMessageReader reader(e->data);
 | |
|     auto event = reader.getRoot<cereal::Event>();
 | |
|     sm_->update_msgs(nanos_since_boot(), {{sockets_[e->which], event}});
 | |
|   }
 | |
| }
 | |
| 
 | |
| void Replay::publishFrame(const Event *e) {
 | |
|   CameraType cam;
 | |
|   switch (e->which) {
 | |
|     case cereal::Event::ROAD_ENCODE_IDX: cam = RoadCam; break;
 | |
|     case cereal::Event::DRIVER_ENCODE_IDX: cam = DriverCam; break;
 | |
|     case cereal::Event::WIDE_ROAD_ENCODE_IDX: cam = WideRoadCam; break;
 | |
|     default: return;  // Invalid event type
 | |
|   }
 | |
| 
 | |
|   if ((cam == DriverCam && !hasFlag(REPLAY_FLAG_DCAM)) || (cam == WideRoadCam && !hasFlag(REPLAY_FLAG_ECAM)))
 | |
|     return;  // Camera isdisabled
 | |
| 
 | |
|   auto seg_it = event_data_->segments.find(e->eidx_segnum);
 | |
|   if (seg_it != event_data_->segments.end()) {
 | |
|     if (auto &frame = seg_it->second->frames[cam]; frame) {
 | |
|       camera_server_->pushFrame(cam, frame.get(), e);
 | |
|     }
 | |
|   }
 | |
| }
 | |
| 
 | |
| void Replay::streamThread() {
 | |
|   stream_thread_id = pthread_self();
 | |
|   std::unique_lock lk(stream_lock_);
 | |
| 
 | |
|   while (true) {
 | |
|     stream_cv_.wait(lk, [this]() { return exit_ || (events_ready_ && !interrupt_requested_); });
 | |
|     if (exit_) break;
 | |
| 
 | |
|     event_data_ = seg_mgr_->getEventData();
 | |
|     const auto &events = event_data_->events;
 | |
|     auto first = std::upper_bound(events.cbegin(), events.cend(), Event(cur_which_, cur_mono_time_, {}));
 | |
|     if (first == events.cend()) {
 | |
|       rInfo("waiting for events...");
 | |
|       events_ready_ = false;
 | |
|       continue;
 | |
|     }
 | |
| 
 | |
|     auto it = publishEvents(first, events.cend());
 | |
| 
 | |
|     // Ensure frames are sent before unlocking to prevent race conditions
 | |
|     if (camera_server_) {
 | |
|       camera_server_->waitForSent();
 | |
|     }
 | |
| 
 | |
|     if (it == events.cend() && !hasFlag(REPLAY_FLAG_NO_LOOP)) {
 | |
|       int last_segment = seg_mgr_->route_.segments().rbegin()->first;
 | |
|       if (event_data_->isSegmentLoaded(last_segment)) {
 | |
|         rInfo("reaches the end of route, restart from beginning");
 | |
|         stream_lock_.unlock();
 | |
|         seekTo(minSeconds(), false);
 | |
|         stream_lock_.lock();
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| }
 | |
| 
 | |
| std::vector<Event>::const_iterator Replay::publishEvents(std::vector<Event>::const_iterator first,
 | |
|                                                          std::vector<Event>::const_iterator last) {
 | |
|   uint64_t evt_start_ts = cur_mono_time_;
 | |
|   uint64_t loop_start_ts = nanos_since_boot();
 | |
|   double prev_replay_speed = speed_;
 | |
| 
 | |
|   for (; !interrupt_requested_ && first != last; ++first) {
 | |
|     const Event &evt = *first;
 | |
| 
 | |
|     int segment = toSeconds(evt.mono_time) / 60;
 | |
|     if (current_segment_.load(std::memory_order_relaxed) != segment) {
 | |
|       current_segment_.store(segment, std::memory_order_relaxed);
 | |
|       seg_mgr_->setCurrentSegment(segment);
 | |
|     }
 | |
| 
 | |
|     cur_mono_time_ = evt.mono_time;
 | |
|     cur_which_ = evt.which;
 | |
| 
 | |
|     // Skip events if socket is not present
 | |
|     if (!sockets_[evt.which]) continue;
 | |
| 
 | |
|     const uint64_t current_nanos = nanos_since_boot();
 | |
|     const int64_t time_diff = (evt.mono_time - evt_start_ts) / speed_ - (current_nanos - loop_start_ts);
 | |
| 
 | |
|     // Reset timestamps for potential synchronization issues:
 | |
|     // - A negative time_diff may indicate slow execution or system wake-up,
 | |
|     // - A time_diff exceeding 1 second suggests a skipped segment.
 | |
|     if ((time_diff < -1e9 || time_diff >= 1e9) || speed_ != prev_replay_speed) {
 | |
|       evt_start_ts = evt.mono_time;
 | |
|       loop_start_ts = current_nanos;
 | |
|       prev_replay_speed = speed_;
 | |
|     } else if (time_diff > 0) {
 | |
|       precise_nano_sleep(time_diff, interrupt_requested_);
 | |
|     }
 | |
| 
 | |
|     if (interrupt_requested_) break;
 | |
| 
 | |
|     if (evt.eidx_segnum == -1) {
 | |
|       publishMessage(&evt);
 | |
|     } else if (camera_server_) {
 | |
|       if (speed_ > 1.0) {
 | |
|         camera_server_->waitForSent();
 | |
|       }
 | |
|       publishFrame(&evt);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return first;
 | |
| }
 | |
| 
 |