Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/ddprof_worker_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include <array>
#include <chrono>
#include <string>

namespace ddprof {

Expand Down Expand Up @@ -42,6 +43,7 @@ struct DDProfWorkerContext {
uint32_t count_worker{0}; // exports since last cache clear
std::array<uint64_t, kMaxTypeWatcher> lost_events_per_watcher{};
LiveAllocation live_allocation;
std::string metrics_json[2]; // double-buffered alongside pprof
int64_t perfclock_offset;
PerfClock::time_point last_processed_event_timestamp;
};
Expand Down
2 changes: 2 additions & 0 deletions include/exporter/ddprof_exporter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "tags.hpp"

#include <datadog/common.h>
#include <string>

struct ddog_prof_ProfileExporter;
struct ddog_prof_Profile;
Expand All @@ -37,6 +38,7 @@ DDRes ddprof_exporter_new(const UserTags *user_tags, DDProfExporter *exporter);

DDRes ddprof_exporter_export(ddog_prof_Profile *profile,
const Tags &additional_tags, uint32_t profile_seq,
const std::string &metrics_json,
DDProfExporter *exporter);

DDRes ddprof_exporter_free(DDProfExporter *exporter);
Expand Down
2 changes: 2 additions & 0 deletions include/live_allocation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class LiveAllocation {
PprofStacks _unique_stacks;
uint32_t _address_conflict_count = 0;
uint32_t _tracked_address_count = 0;
uint32_t _active_shards = 0;
};

using PidMap = std::unordered_map<pid_t, PidStacks>;
Expand All @@ -63,6 +64,7 @@ class LiveAllocation {
PidStacks &pid_stacks = pid_map[pid];
pid_stacks._address_conflict_count = address_conflict_count;
pid_stacks._tracked_address_count = tracked_address_count;
pid_stacks._active_shards = active_shards;
LG_NTC("<%u> PID %d: live allocations=%lu, Unique "
"stacks=%lu, lib tracked addresses=%u, lib active shards=%u, lib "
"address conflicts=%u",
Expand Down
3 changes: 3 additions & 0 deletions include/procutils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

namespace ddprof {

// Get internal stats from /proc/<pid>/stat
DDRes proc_read_pid(pid_t pid, ProcStatus *procstat);

// Get internal stats from /proc/self/stat
DDRes proc_read(ProcStatus *procstat);

Expand Down
67 changes: 64 additions & 3 deletions src/ddprof_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@
#include "unwind_state.hpp"

#include <chrono>
#include <cinttypes>
#include <cstdio>
#include <ctime>
#include <set>
#include <string>
#include <sys/time.h>
#include <unistd.h>

Expand Down Expand Up @@ -492,9 +496,9 @@ void *ddprof_worker_export_thread(void *arg) {
// gets joined forcefully, we should not resume on same value
uint32_t const profile_seq = (worker->persistent_worker_state->profile_seq)++;

if (IsDDResFatal(ddprof_exporter_export(&worker->pprof[i]->_profile,
worker->pprof[i]->_tags, profile_seq,
worker->exp[i]))) {
if (IsDDResFatal(ddprof_exporter_export(
&worker->pprof[i]->_profile, worker->pprof[i]->_tags, profile_seq,
worker->metrics_json[i], worker->exp[i]))) {
LG_NFO("Failed to export from worker");
worker->exp_error = true;
}
Expand Down Expand Up @@ -570,6 +574,59 @@ DDRes worker_library_free(DDProfContext &ctx) {
return {};
}

std::string build_metrics_json(const LiveAllocation &live_allocations) {
int64_t total_rss = 0;
int64_t total_vsize = 0;
unsigned long total_minflt = 0;
unsigned long total_majflt = 0;
uint64_t total_arena_count = 0;
uint64_t total_tracked_addresses = 0;
uint64_t total_unique_stacks = 0;

const long page_size = get_page_size();

// Collect unique PIDs across all watchers (a PID may appear in multiple
// watcher positions) to avoid double-counting procfs stats.
std::set<pid_t> seen_pids;

for (const auto &pid_map : live_allocations._watcher_vector) {
for (const auto &[pid, pid_stacks] : pid_map) {
// Read procfs only once per PID
if (seen_pids.insert(pid).second) {
ProcStatus proc_status{};
if (IsDDResOK(proc_read_pid(pid, &proc_status))) {
total_rss += static_cast<int64_t>(proc_status.rss) * page_size;
total_vsize += static_cast<int64_t>(proc_status.vsize);
total_minflt += proc_status.minflt;
total_majflt += proc_status.majflt;
} else {
LG_DBG("Failed to read /proc/%d/stat for metrics (process may be "
"starting or exiting)",
pid);
}
}
// Allocation tracker stats are per watcher, so always accumulate
total_arena_count += pid_stacks._active_shards;
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: slightly unexpected if we start adding these...

total_tracked_addresses += pid_stacks._tracked_address_count;
total_unique_stacks += pid_stacks._unique_stacks.size();
}
}

char buf[512];
snprintf(buf, sizeof(buf),
"[[\"native_rss\",%" PRId64 "],"
"[\"native_vsize\",%" PRId64 "],"
"[\"native_minor_faults\",%lu],"
"[\"native_major_faults\",%lu],"
"[\"native_arena_count\",%" PRIu64 "],"
"[\"native_tracked_addresses\",%" PRIu64 "],"
"[\"native_unique_stacks\",%" PRIu64 "]]",
total_rss, total_vsize, total_minflt, total_majflt,
total_arena_count, total_tracked_addresses, total_unique_stacks);
LG_DBG("metrics.json: %s", buf);
return buf;
}

/// Cycle operations : export, sync metrics, update counters
DDRes ddprof_worker_cycle(DDProfContext &ctx,
std::chrono::steady_clock::time_point now,
Expand All @@ -579,6 +636,10 @@ DDRes ddprof_worker_cycle(DDProfContext &ctx,
DDRES_CHECK_FWD(clear_unvisited_pids(ctx));
DDRES_CHECK_FWD(aggregate_live_allocations(ctx));

// Build metrics.json for this cycle (before buffer swap)
ctx.worker_ctx.metrics_json[ctx.worker_ctx.i_current_pprof] =
build_metrics_json(ctx.worker_ctx.live_allocation);

// Take the current pprof contents and ship them to the backend. This also
// clears the pprof for reuse
// Dispatch happens in a thread, with the underlying data structure for
Expand Down
27 changes: 21 additions & 6 deletions src/exporter/ddprof_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ DDRes ddprof_exporter_new(const UserTags *user_tags, DDProfExporter *exporter) {

DDRes ddprof_exporter_export(ddog_prof_Profile *profile,
const Tags &additional_tags, uint32_t profile_seq,
const std::string &metrics_json,
DDProfExporter *exporter) {
DDRes res = ddres_init();
ddog_prof_Profile_SerializeResult serialized_result =
Expand Down Expand Up @@ -342,14 +343,28 @@ DDRes ddprof_exporter_export(ddog_prof_Profile *profile,

LG_NTC("[EXPORTER] Export buffer of size %lu", buffer->len);

// Build metrics.json file to send alongside the profile
ddog_prof_Exporter_Slice_File files_slice =
ddog_prof_Exporter_Slice_File_empty();
ddog_prof_Exporter_File metrics_file{};
if (!metrics_json.empty()) {
metrics_file.name = to_CharSlice("metrics.json");
metrics_file.file = {
.ptr = reinterpret_cast<const uint8_t *>(metrics_json.data()),
.len = metrics_json.size()};
files_slice = {.ptr = &metrics_file, .len = 1};
LG_NTC("[EXPORTER] Attaching metrics.json (%zu bytes)",
metrics_json.size());
}

ddog_prof_Result_HttpStatus result = ddog_prof_Exporter_send_blocking(
&exporter->_exporter, encoded_profile,
ddog_prof_Exporter_Slice_File_empty(), // files_to_compress_and_export
&ffi_additional_tags, // optional_additional_tags
nullptr, // optional_process_tags
nullptr, // optional_internal_metadata_json
nullptr, // optional_info_json
nullptr // cancellation_token
files_slice, // files_to_compress_and_export
&ffi_additional_tags, // optional_additional_tags
nullptr, // optional_process_tags
nullptr, // optional_internal_metadata_json
nullptr, // optional_info_json
nullptr // cancellation_token
);

if (result.tag == DDOG_PROF_RESULT_HTTP_STATUS_ERR_HTTP_STATUS) {
Expand Down
14 changes: 10 additions & 4 deletions src/procutils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ char StatusLine[] =

}

DDRes proc_read(ProcStatus *procstat) {
UniqueFile const f{fopen("/proc/self/stat", "r")};
DDRes proc_read_pid(pid_t pid, ProcStatus *procstat) {
char path[64];
snprintf(path, sizeof(path), "/proc/%d/stat", pid);
UniqueFile const f{fopen(path, "r")};
if (!f) {
DDRES_RETURN_ERROR_LOG(DD_WHAT_PROCSTATE, "Failed to open /proc/self/stat");
DDRES_RETURN_ERROR_LOG(DD_WHAT_PROCSTATE, "Failed to open %s", path);
}

if (0 > fscanf(f.get(), StatusLine, &procstat->pid, &procstat->comm,
Expand All @@ -49,11 +51,15 @@ DDRes proc_read(ProcStatus *procstat) {
&procstat->start_data, &procstat->end_data,
&procstat->start_brk, &procstat->arg_start, &procstat->arg_end,
&procstat->env_start, &procstat->env_end)) {
DDRES_RETURN_ERROR_LOG(DD_WHAT_PROCSTATE, "Failed to read /proc/self/stat");
DDRES_RETURN_ERROR_LOG(DD_WHAT_PROCSTATE, "Failed to read %s", path);
}
return {};
}

DDRes proc_read(ProcStatus *procstat) {
return proc_read_pid(getpid(), procstat);
}

bool check_file_type(const char *pathname, int file_type) {
struct stat info;

Expand Down