diff --git a/cereal b/cereal index 3be0bf50c8..bab2f2b95e 160000 --- a/cereal +++ b/cereal @@ -1 +1 @@ -Subproject commit 3be0bf50c801f4d01f92ee08cebcf960f09b180d +Subproject commit bab2f2b95e38da4c968efc31369163056e938b30 diff --git a/selfdrive/locationd/locationd.cc b/selfdrive/locationd/locationd.cc index dcdae4e84b..08f105dd99 100755 --- a/selfdrive/locationd/locationd.cc +++ b/selfdrive/locationd/locationd.cc @@ -340,8 +340,8 @@ kj::ArrayPtr Localizer::get_message_bytes(MessageBuilder& msg_build int Localizer::locationd_thread() { const std::initializer_list service_list = { "gpsLocationExternal", "sensorEvents", "cameraOdometry", "liveCalibration", "carState" }; - SubMaster sm(service_list, nullptr, { "gpsLocationExternal" }); PubMaster pm({ "liveLocationKalman" }); + SubMaster sm(service_list, nullptr, { "gpsLocationExternal" }); Params params; diff --git a/selfdrive/locationd/ubloxd.cc b/selfdrive/locationd/ubloxd.cc index 7b3c3039bd..a0b4ef7fad 100644 --- a/selfdrive/locationd/ubloxd.cc +++ b/selfdrive/locationd/ubloxd.cc @@ -13,12 +13,13 @@ int main() { AlignedBuffer aligned_buf; UbloxMsgParser parser; + PubMaster pm({"ubloxGnss", "gpsLocationExternal"}); + Context * context = Context::create(); SubSocket * subscriber = SubSocket::create(context, "ubloxRaw"); assert(subscriber != NULL); subscriber->setTimeout(100); - PubMaster pm({"ubloxGnss", "gpsLocationExternal"}); while (!do_exit) { Message * msg = subscriber->receive(); diff --git a/selfdrive/test/process_replay/process_replay.py b/selfdrive/test/process_replay/process_replay.py index 1873340495..36125826e6 100755 --- a/selfdrive/test/process_replay/process_replay.py +++ b/selfdrive/test/process_replay/process_replay.py @@ -432,32 +432,38 @@ def python_replay_process(cfg, lr, fingerprint=None): def cpp_replay_process(cfg, lr, fingerprint=None): sub_sockets = [s for _, sub in cfg.pub_sub.items() for s in sub] # We get responses here pm = messaging.PubMaster(cfg.pub_sub.keys()) - sockets = {s: messaging.sub_sock(s, timeout=1000) for s in sub_sockets} all_msgs = sorted(lr, key=lambda msg: msg.logMonoTime) pub_msgs = [msg for msg in all_msgs if msg.which() in list(cfg.pub_sub.keys())] + os.environ["SIMULATION"] = "1" # Disable submaster alive checks managed_processes[cfg.proc_name].prepare() managed_processes[cfg.proc_name].start() - time.sleep(1) # We give the process time to start + while not all(pm.all_readers_updated(s) for s in cfg.pub_sub.keys()): + time.sleep(0) - log_msgs = [] + # Make sure all subscribers are connected + sockets = {s: messaging.sub_sock(s, timeout=1000) for s in sub_sockets} for s in sub_sockets: messaging.recv_one_or_none(sockets[s]) - for msg in tqdm(pub_msgs, disable=CI): + log_msgs = [] + for i, msg in enumerate(tqdm(pub_msgs, disable=CI)): pm.send(msg.which(), msg.as_builder()) + resp_sockets = cfg.pub_sub[msg.which()] if cfg.should_recv_callback is None else cfg.should_recv_callback(msg) for s in resp_sockets: response = messaging.recv_one(sockets[s]) - if response is not None: + + if response is None: + print(f"Warning, no response received {i}") + else: log_msgs.append(response) - if not len(resp_sockets): + if not len(resp_sockets): # We only need to wait if we didn't already wait for a response while not pm.all_readers_updated(msg.which()): time.sleep(0) - time.sleep(0.0001) managed_processes[cfg.proc_name].stop() return log_msgs