|
|
@ -173,6 +173,9 @@ int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void handle_user_flag(LoggerdState *s) { |
|
|
|
void handle_user_flag(LoggerdState *s) { |
|
|
|
|
|
|
|
static int prev_segment = -1; |
|
|
|
|
|
|
|
if (s->rotate_segment == prev_segment) return; |
|
|
|
|
|
|
|
|
|
|
|
LOGW("preserving %s", s->segment_path); |
|
|
|
LOGW("preserving %s", s->segment_path); |
|
|
|
|
|
|
|
|
|
|
|
#ifdef __APPLE__ |
|
|
|
#ifdef __APPLE__ |
|
|
@ -183,16 +186,17 @@ void handle_user_flag(LoggerdState *s) { |
|
|
|
if (ret) { |
|
|
|
if (ret) { |
|
|
|
LOGE("setxattr %s failed for %s: %s", PRESERVE_ATTR_NAME, s->segment_path, strerror(errno)); |
|
|
|
LOGE("setxattr %s failed for %s: %s", PRESERVE_ATTR_NAME, s->segment_path, strerror(errno)); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
prev_segment = s->rotate_segment.load(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void loggerd_thread() { |
|
|
|
void loggerd_thread() { |
|
|
|
// setup messaging
|
|
|
|
// setup messaging
|
|
|
|
typedef struct QlogState { |
|
|
|
typedef struct ServiceState { |
|
|
|
std::string name; |
|
|
|
std::string name; |
|
|
|
int counter, freq; |
|
|
|
int counter, freq; |
|
|
|
bool encoder; |
|
|
|
bool encoder, user_flag; |
|
|
|
} QlogState; |
|
|
|
} ServiceState; |
|
|
|
std::unordered_map<SubSocket*, QlogState> qlog_states; |
|
|
|
std::unordered_map<SubSocket*, ServiceState> service_state; |
|
|
|
std::unordered_map<SubSocket*, struct RemoteEncoder> remote_encoders; |
|
|
|
std::unordered_map<SubSocket*, struct RemoteEncoder> remote_encoders; |
|
|
|
|
|
|
|
|
|
|
|
std::unique_ptr<Context> ctx(Context::create()); |
|
|
|
std::unique_ptr<Context> ctx(Context::create()); |
|
|
@ -207,11 +211,12 @@ void loggerd_thread() { |
|
|
|
SubSocket * sock = SubSocket::create(ctx.get(), it.name); |
|
|
|
SubSocket * sock = SubSocket::create(ctx.get(), it.name); |
|
|
|
assert(sock != NULL); |
|
|
|
assert(sock != NULL); |
|
|
|
poller->registerSocket(sock); |
|
|
|
poller->registerSocket(sock); |
|
|
|
qlog_states[sock] = { |
|
|
|
service_state[sock] = { |
|
|
|
.name = it.name, |
|
|
|
.name = it.name, |
|
|
|
.counter = 0, |
|
|
|
.counter = 0, |
|
|
|
.freq = it.decimation, |
|
|
|
.freq = it.decimation, |
|
|
|
.encoder = encoder, |
|
|
|
.encoder = encoder, |
|
|
|
|
|
|
|
.user_flag = (strcmp(it.name, "userFlag") == 0), |
|
|
|
}; |
|
|
|
}; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -236,20 +241,19 @@ void loggerd_thread() { |
|
|
|
for (auto sock : poller->poll(1000)) { |
|
|
|
for (auto sock : poller->poll(1000)) { |
|
|
|
if (do_exit) break; |
|
|
|
if (do_exit) break; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ServiceState &service = service_state[sock]; |
|
|
|
|
|
|
|
if (service.user_flag) { |
|
|
|
|
|
|
|
handle_user_flag(&s); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// drain socket
|
|
|
|
// drain socket
|
|
|
|
int count = 0; |
|
|
|
int count = 0; |
|
|
|
QlogState &qs = qlog_states[sock]; |
|
|
|
|
|
|
|
Message *msg = nullptr; |
|
|
|
Message *msg = nullptr; |
|
|
|
while (!do_exit && (msg = sock->receive(true))) { |
|
|
|
while (!do_exit && (msg = sock->receive(true))) { |
|
|
|
const bool in_qlog = qs.freq != -1 && (qs.counter++ % qs.freq == 0); |
|
|
|
const bool in_qlog = service.freq != -1 && (service.counter++ % service.freq == 0); |
|
|
|
|
|
|
|
if (service.encoder) { |
|
|
|
if (qs.name == "userFlag") { |
|
|
|
|
|
|
|
handle_user_flag(&s); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (qs.encoder) { |
|
|
|
|
|
|
|
s.last_camera_seen_tms = millis_since_boot(); |
|
|
|
s.last_camera_seen_tms = millis_since_boot(); |
|
|
|
bytes_count += handle_encoder_msg(&s, msg, qs.name, remote_encoders[sock], encoder_infos_dict[qs.name]); |
|
|
|
bytes_count += handle_encoder_msg(&s, msg, service.name, remote_encoders[sock], encoder_infos_dict[service.name]); |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
logger_log(&s.logger, (uint8_t *)msg->getData(), msg->getSize(), in_qlog); |
|
|
|
logger_log(&s.logger, (uint8_t *)msg->getData(), msg->getSize(), in_qlog); |
|
|
|
bytes_count += msg->getSize(); |
|
|
|
bytes_count += msg->getSize(); |
|
|
@ -265,7 +269,7 @@ void loggerd_thread() { |
|
|
|
|
|
|
|
|
|
|
|
count++; |
|
|
|
count++; |
|
|
|
if (count >= 200) { |
|
|
|
if (count >= 200) { |
|
|
|
LOGD("large volume of '%s' messages", qs.name.c_str()); |
|
|
|
LOGD("large volume of '%s' messages", service.name.c_str()); |
|
|
|
break; |
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@ -282,7 +286,7 @@ void loggerd_thread() { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// messaging cleanup
|
|
|
|
// messaging cleanup
|
|
|
|
for (auto &[sock, qs] : qlog_states) delete sock; |
|
|
|
for (auto &[sock, service] : service_state) delete sock; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
int main(int argc, char** argv) { |
|
|
|
int main(int argc, char** argv) { |
|
|
|