openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

141 lines
4.5 KiB

#include "tools/replay/seg_mgr.h"
5 months ago
#include <algorithm>
SegmentManager::~SegmentManager() {
{
std::unique_lock lock(mutex_);
exit_ = true;
}
cv_.notify_one();
if (thread_.joinable()) thread_.join();
}
bool SegmentManager::load() {
if (!route_.load()) {
rError("failed to load route: %s", route_.name().c_str());
return false;
}
for (const auto &[n, file] : route_.segments()) {
if (!file.rlog.empty() || !file.qlog.empty()) {
segments_.insert({n, nullptr});
}
}
if (segments_.empty()) {
rInfo("no valid segments in route: %s", route_.name().c_str());
return false;
}
rInfo("loaded route %s with %zu valid segments", route_.name().c_str(), segments_.size());
thread_ = std::thread(&SegmentManager::manageSegmentCache, this);
return true;
}
void SegmentManager::setCurrentSegment(int seg_num) {
{
std::unique_lock lock(mutex_);
if (cur_seg_num_ == seg_num) return;
cur_seg_num_ = seg_num;
needs_update_ = true;
}
cv_.notify_one();
}
void SegmentManager::manageSegmentCache() {
while (true) {
std::unique_lock lock(mutex_);
cv_.wait(lock, [this]() { return exit_ || needs_update_; });
if (exit_) break;
needs_update_ = false;
auto cur = segments_.lower_bound(cur_seg_num_);
if (cur == segments_.end()) continue;
// Calculate the range of segments to load
auto begin = std::prev(cur, std::min<int>(segment_cache_limit_ / 2, std::distance(segments_.begin(), cur)));
auto end = std::next(begin, std::min<int>(segment_cache_limit_, std::distance(begin, segments_.end())));
begin = std::prev(end, std::min<int>(segment_cache_limit_, std::distance(segments_.begin(), end)));
lock.unlock();
loadSegmentsInRange(begin, cur, end);
bool merged = mergeSegments(begin, end);
// Free segments outside the current range
std::for_each(segments_.begin(), begin, [](auto &segment) { segment.second.reset(); });
std::for_each(end, segments_.end(), [](auto &segment) { segment.second.reset(); });
if (merged && onSegmentMergedCallback_) {
onSegmentMergedCallback_(); // Notify listener that segments have been merged
}
}
}
bool SegmentManager::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end) {
std::set<int> segments_to_merge;
size_t total_event_count = 0;
for (auto it = begin; it != end; ++it) {
const auto &segment = it->second;
if (segment && segment->getState() == Segment::LoadState::Loaded) {
segments_to_merge.insert(segment->seg_num);
total_event_count += segment->log->events.size();
}
}
if (segments_to_merge == merged_segments_) return false;
auto merged_event_data = std::make_shared<EventData>();
auto &merged_events = merged_event_data->events;
merged_events.reserve(total_event_count);
rDebug("merging segments: %s", join(segments_to_merge, ", ").c_str());
for (int n : segments_to_merge) {
const auto &events = segments_.at(n)->log->events;
if (events.empty()) continue;
// Skip INIT_DATA if present
auto events_begin = (events.front().which == cereal::Event::Which::INIT_DATA) ? std::next(events.begin()) : events.begin();
size_t previous_size = merged_events.size();
merged_events.insert(merged_events.end(), events_begin, events.end());
std::inplace_merge(merged_events.begin(), merged_events.begin() + previous_size, merged_events.end());
merged_event_data->segments[n] = segments_.at(n);
}
std::atomic_store(&event_data_, std::move(merged_event_data));
merged_segments_ = segments_to_merge;
return true;
}
void SegmentManager::loadSegmentsInRange(SegmentMap::iterator begin, SegmentMap::iterator cur, SegmentMap::iterator end) {
auto tryLoadSegment = [this](auto first, auto last) {
for (auto it = first; it != last; ++it) {
auto &segment_ptr = it->second;
if (!segment_ptr) {
segment_ptr = std::make_shared<Segment>(
it->first, route_.at(it->first), flags_, filters_,
[this](int seg_num, bool success) {
std::unique_lock lock(mutex_);
needs_update_ = true;
cv_.notify_one();
});
}
if (segment_ptr->getState() == Segment::LoadState::Loading) {
return true; // Segment is still loading
}
}
return false; // No segments need loading
};
// Try forward loading, then reverse if necessary
if (!tryLoadSegment(cur, end)) {
tryLoadSegment(std::make_reverse_iterator(cur), std::make_reverse_iterator(begin));
}
}