summaryrefslogtreecommitdiffstats
path: root/tools/perf/builtin-record.c
diff options
context:
space:
mode:
Diffstat (limited to 'tools/perf/builtin-record.c')
-rw-r--r--tools/perf/builtin-record.c1168
1 files changed, 1100 insertions, 68 deletions
diff --git a/tools/perf/builtin-record.c b/tools/perf/builtin-record.c
index bb716c953d02..ba74fab02e62 100644
--- a/tools/perf/builtin-record.c
+++ b/tools/perf/builtin-record.c
@@ -51,6 +51,7 @@
#include "util/evlist-hybrid.h"
#include "asm/bug.h"
#include "perf.h"
+#include "cputopo.h"
#include <errno.h>
#include <inttypes.h>
@@ -58,6 +59,9 @@
#include <poll.h>
#include <pthread.h>
#include <unistd.h>
+#ifndef HAVE_GETTID
+#include <syscall.h>
+#endif
#include <sched.h>
#include <signal.h>
#ifdef HAVE_EVENTFD_SUPPORT
@@ -87,6 +91,57 @@ struct switch_output {
int cur_file;
};
+struct thread_mask {
+ struct mmap_cpu_mask maps;
+ struct mmap_cpu_mask affinity;
+};
+
+struct record_thread {
+ pid_t tid;
+ struct thread_mask *mask;
+ struct {
+ int msg[2];
+ int ack[2];
+ } pipes;
+ struct fdarray pollfd;
+ int ctlfd_pos;
+ int nr_mmaps;
+ struct mmap **maps;
+ struct mmap **overwrite_maps;
+ struct record *rec;
+ unsigned long long samples;
+ unsigned long waking;
+ u64 bytes_written;
+ u64 bytes_transferred;
+ u64 bytes_compressed;
+};
+
+static __thread struct record_thread *thread;
+
+enum thread_msg {
+ THREAD_MSG__UNDEFINED = 0,
+ THREAD_MSG__READY,
+ THREAD_MSG__MAX,
+};
+
+static const char *thread_msg_tags[THREAD_MSG__MAX] = {
+ "UNDEFINED", "READY"
+};
+
+enum thread_spec {
+ THREAD_SPEC__UNDEFINED = 0,
+ THREAD_SPEC__CPU,
+ THREAD_SPEC__CORE,
+ THREAD_SPEC__PACKAGE,
+ THREAD_SPEC__NUMA,
+ THREAD_SPEC__USER,
+ THREAD_SPEC__MAX,
+};
+
+static const char *thread_spec_tags[THREAD_SPEC__MAX] = {
+ "undefined", "cpu", "core", "package", "numa", "user"
+};
+
struct record {
struct perf_tool tool;
struct record_opts opts;
@@ -109,9 +164,11 @@ struct record {
bool timestamp_boundary;
struct switch_output switch_output;
unsigned long long samples;
- struct mmap_cpu_mask affinity_mask;
unsigned long output_max_size; /* = 0: unlimited */
struct perf_debuginfod debuginfod;
+ int nr_threads;
+ struct thread_mask *thread_masks;
+ struct record_thread *thread_data;
};
static volatile int done;
@@ -124,6 +181,18 @@ static const char *affinity_tags[PERF_AFFINITY_MAX] = {
"SYS", "NODE", "CPU"
};
+#ifndef HAVE_GETTID
+static inline pid_t gettid(void)
+{
+ return (pid_t)syscall(__NR_gettid);
+}
+#endif
+
+static int record__threads_enabled(struct record *rec)
+{
+ return rec->opts.threads_spec;
+}
+
static bool switch_output_signal(struct record *rec)
{
return rec->switch_output.signal &&
@@ -143,10 +212,22 @@ static bool switch_output_time(struct record *rec)
trigger_is_ready(&switch_output_trigger);
}
+static u64 record__bytes_written(struct record *rec)
+{
+ int t;
+ u64 bytes_written = rec->bytes_written;
+ struct record_thread *thread_data = rec->thread_data;
+
+ for (t = 0; t < rec->nr_threads; t++)
+ bytes_written += thread_data[t].bytes_written;
+
+ return bytes_written;
+}
+
static bool record__output_max_size_exceeded(struct record *rec)
{
return rec->output_max_size &&
- (rec->bytes_written >= rec->output_max_size);
+ (record__bytes_written(rec) >= rec->output_max_size);
}
static int record__write(struct record *rec, struct mmap *map __maybe_unused,
@@ -154,17 +235,23 @@ static int record__write(struct record *rec, struct mmap *map __maybe_unused,
{
struct perf_data_file *file = &rec->session->data->file;
+ if (map && map->file)
+ file = map->file;
+
if (perf_data_file__write(file, bf, size) < 0) {
pr_err("failed to write perf data, error: %m\n");
return -1;
}
- rec->bytes_written += size;
+ if (map && map->file)
+ thread->bytes_written += size;
+ else
+ rec->bytes_written += size;
if (record__output_max_size_exceeded(rec) && !done) {
fprintf(stderr, "[ perf record: perf size limit reached (%" PRIu64 " KB),"
" stopping session ]\n",
- rec->bytes_written >> 10);
+ record__bytes_written(rec) >> 10);
done = 1;
}
@@ -176,8 +263,8 @@ static int record__write(struct record *rec, struct mmap *map __maybe_unused,
static int record__aio_enabled(struct record *rec);
static int record__comp_enabled(struct record *rec);
-static size_t zstd_compress(struct perf_session *session, void *dst, size_t dst_size,
- void *src, size_t src_size);
+static size_t zstd_compress(struct perf_session *session, struct mmap *map,
+ void *dst, size_t dst_size, void *src, size_t src_size);
#ifdef HAVE_AIO_SUPPORT
static int record__aio_write(struct aiocb *cblock, int trace_fd,
@@ -311,7 +398,7 @@ static int record__aio_pushfn(struct mmap *map, void *to, void *buf, size_t size
*/
if (record__comp_enabled(aio->rec)) {
- size = zstd_compress(aio->rec->session, aio->data + aio->size,
+ size = zstd_compress(aio->rec->session, NULL, aio->data + aio->size,
mmap__mmap_len(map) - aio->size,
buf, size);
} else {
@@ -538,11 +625,11 @@ static int record__pushfn(struct mmap *map, void *to, void *bf, size_t size)
struct record *rec = to;
if (record__comp_enabled(rec)) {
- size = zstd_compress(rec->session, map->data, mmap__mmap_len(map), bf, size);
+ size = zstd_compress(rec->session, map, map->data, mmap__mmap_len(map), bf, size);
bf = map->data;
}
- rec->samples++;
+ thread->samples++;
return record__write(rec, map, bf, size);
}
@@ -718,6 +805,12 @@ static int record__auxtrace_init(struct record *rec)
{
int err;
+ if ((rec->opts.auxtrace_snapshot_opts || rec->opts.auxtrace_sample_opts)
+ && record__threads_enabled(rec)) {
+ pr_err("AUX area tracing options are not available in parallel streaming mode.\n");
+ return -EINVAL;
+ }
+
if (!rec->itr) {
rec->itr = auxtrace_record__init(rec->evlist, &err);
if (err)
@@ -841,9 +934,218 @@ static int record__kcore_copy(struct machine *machine, struct perf_data *data)
return kcore_copy(from_dir, kcore_dir);
}
+static void record__thread_data_init_pipes(struct record_thread *thread_data)
+{
+ thread_data->pipes.msg[0] = -1;
+ thread_data->pipes.msg[1] = -1;
+ thread_data->pipes.ack[0] = -1;
+ thread_data->pipes.ack[1] = -1;
+}
+
+static int record__thread_data_open_pipes(struct record_thread *thread_data)
+{
+ if (pipe(thread_data->pipes.msg))
+ return -EINVAL;
+
+ if (pipe(thread_data->pipes.ack)) {
+ close(thread_data->pipes.msg[0]);
+ thread_data->pipes.msg[0] = -1;
+ close(thread_data->pipes.msg[1]);
+ thread_data->pipes.msg[1] = -1;
+ return -EINVAL;
+ }
+
+ pr_debug2("thread_data[%p]: msg=[%d,%d], ack=[%d,%d]\n", thread_data,
+ thread_data->pipes.msg[0], thread_data->pipes.msg[1],
+ thread_data->pipes.ack[0], thread_data->pipes.ack[1]);
+
+ return 0;
+}
+
+static void record__thread_data_close_pipes(struct record_thread *thread_data)
+{
+ if (thread_data->pipes.msg[0] != -1) {
+ close(thread_data->pipes.msg[0]);
+ thread_data->pipes.msg[0] = -1;
+ }
+ if (thread_data->pipes.msg[1] != -1) {
+ close(thread_data->pipes.msg[1]);
+ thread_data->pipes.msg[1] = -1;
+ }
+ if (thread_data->pipes.ack[0] != -1) {
+ close(thread_data->pipes.ack[0]);
+ thread_data->pipes.ack[0] = -1;
+ }
+ if (thread_data->pipes.ack[1] != -1) {
+ close(thread_data->pipes.ack[1]);
+ thread_data->pipes.ack[1] = -1;
+ }
+}
+
+static int record__thread_data_init_maps(struct record_thread *thread_data, struct evlist *evlist)
+{
+ int m, tm, nr_mmaps = evlist->core.nr_mmaps;
+ struct mmap *mmap = evlist->mmap;
+ struct mmap *overwrite_mmap = evlist->overwrite_mmap;
+ struct perf_cpu_map *cpus = evlist->core.user_requested_cpus;
+
+ thread_data->nr_mmaps = bitmap_weight(thread_data->mask->maps.bits,
+ thread_data->mask->maps.nbits);
+ if (mmap) {
+ thread_data->maps = zalloc(thread_data->nr_mmaps * sizeof(struct mmap *));
+ if (!thread_data->maps)
+ return -ENOMEM;
+ }
+ if (overwrite_mmap) {
+ thread_data->overwrite_maps = zalloc(thread_data->nr_mmaps * sizeof(struct mmap *));
+ if (!thread_data->overwrite_maps) {
+ zfree(&thread_data->maps);
+ return -ENOMEM;
+ }
+ }
+ pr_debug2("thread_data[%p]: nr_mmaps=%d, maps=%p, ow_maps=%p\n", thread_data,
+ thread_data->nr_mmaps, thread_data->maps, thread_data->overwrite_maps);
+
+ for (m = 0, tm = 0; m < nr_mmaps && tm < thread_data->nr_mmaps; m++) {
+ if (test_bit(cpus->map[m].cpu, thread_data->mask->maps.bits)) {
+ if (thread_data->maps) {
+ thread_data->maps[tm] = &mmap[m];
+ pr_debug2("thread_data[%p]: cpu%d: maps[%d] -> mmap[%d]\n",
+ thread_data, cpus->map[m].cpu, tm, m);
+ }
+ if (thread_data->overwrite_maps) {
+ thread_data->overwrite_maps[tm] = &overwrite_mmap[m];
+ pr_debug2("thread_data[%p]: cpu%d: ow_maps[%d] -> ow_mmap[%d]\n",
+ thread_data, cpus->map[m].cpu, tm, m);
+ }
+ tm++;
+ }
+ }
+
+ return 0;
+}
+
+static int record__thread_data_init_pollfd(struct record_thread *thread_data, struct evlist *evlist)
+{
+ int f, tm, pos;
+ struct mmap *map, *overwrite_map;
+
+ fdarray__init(&thread_data->pollfd, 64);
+
+ for (tm = 0; tm < thread_data->nr_mmaps; tm++) {
+ map = thread_data->maps ? thread_data->maps[tm] : NULL;
+ overwrite_map = thread_data->overwrite_maps ?
+ thread_data->overwrite_maps[tm] : NULL;
+
+ for (f = 0; f < evlist->core.pollfd.nr; f++) {
+ void *ptr = evlist->core.pollfd.priv[f].ptr;
+
+ if ((map && ptr == map) || (overwrite_map && ptr == overwrite_map)) {
+ pos = fdarray__dup_entry_from(&thread_data->pollfd, f,
+ &evlist->core.pollfd);
+ if (pos < 0)
+ return pos;
+ pr_debug2("thread_data[%p]: pollfd[%d] <- event_fd=%d\n",
+ thread_data, pos, evlist->core.pollfd.entries[f].fd);
+ }
+ }
+ }
+
+ return 0;
+}
+
+static void record__free_thread_data(struct record *rec)
+{
+ int t;
+ struct record_thread *thread_data = rec->thread_data;
+
+ if (thread_data == NULL)
+ return;
+
+ for (t = 0; t < rec->nr_threads; t++) {
+ record__thread_data_close_pipes(&thread_data[t]);
+ zfree(&thread_data[t].maps);
+ zfree(&thread_data[t].overwrite_maps);
+ fdarray__exit(&thread_data[t].pollfd);
+ }
+
+ zfree(&rec->thread_data);
+}
+
+static int record__alloc_thread_data(struct record *rec, struct evlist *evlist)
+{
+ int t, ret;
+ struct record_thread *thread_data;
+
+ rec->thread_data = zalloc(rec->nr_threads * sizeof(*(rec->thread_data)));
+ if (!rec->thread_data) {
+ pr_err("Failed to allocate thread data\n");
+ return -ENOMEM;
+ }
+ thread_data = rec->thread_data;
+
+ for (t = 0; t < rec->nr_threads; t++)
+ record__thread_data_init_pipes(&thread_data[t]);
+
+ for (t = 0; t < rec->nr_threads; t++) {
+ thread_data[t].rec = rec;
+ thread_data[t].mask = &rec->thread_masks[t];
+ ret = record__thread_data_init_maps(&thread_data[t], evlist);
+ if (ret) {
+ pr_err("Failed to initialize thread[%d] maps\n", t);
+ goto out_free;
+ }
+ ret = record__thread_data_init_pollfd(&thread_data[t], evlist);
+ if (ret) {
+ pr_err("Failed to initialize thread[%d] pollfd\n", t);
+ goto out_free;
+ }
+ if (t) {
+ thread_data[t].tid = -1;
+ ret = record__thread_data_open_pipes(&thread_data[t]);
+ if (ret) {
+ pr_err("Failed to open thread[%d] communication pipes\n", t);
+ goto out_free;
+ }
+ ret = fdarray__add(&thread_data[t].pollfd, thread_data[t].pipes.msg[0],
+ POLLIN | POLLERR | POLLHUP, fdarray_flag__nonfilterable);
+ if (ret < 0) {
+ pr_err("Failed to add descriptor to thread[%d] pollfd\n", t);
+ goto out_free;
+ }
+ thread_data[t].ctlfd_pos = ret;
+ pr_debug2("thread_data[%p]: pollfd[%d] <- ctl_fd=%d\n",
+ thread_data, thread_data[t].ctlfd_pos,
+ thread_data[t].pipes.msg[0]);
+ } else {
+ thread_data[t].tid = gettid();
+ if (evlist->ctl_fd.pos == -1)
+ continue;
+ ret = fdarray__dup_entry_from(&thread_data[t].pollfd, evlist->ctl_fd.pos,
+ &evlist->core.pollfd);
+ if (ret < 0) {
+ pr_err("Failed to duplicate descriptor in main thread pollfd\n");
+ goto out_free;
+ }
+ thread_data[t].ctlfd_pos = ret;
+ pr_debug2("thread_data[%p]: pollfd[%d] <- ctl_fd=%d\n",
+ thread_data, thread_data[t].ctlfd_pos,
+ evlist->core.pollfd.entries[evlist->ctl_fd.pos].fd);
+ }
+ }
+
+ return 0;
+
+out_free:
+ record__free_thread_data(rec);
+
+ return ret;
+}
+
static int record__mmap_evlist(struct record *rec,
struct evlist *evlist)
{
+ int i, ret;
struct record_opts *opts = &rec->opts;
bool auxtrace_overwrite = opts->auxtrace_snapshot_mode ||
opts->auxtrace_sample_mode;
@@ -874,6 +1176,28 @@ static int record__mmap_evlist(struct record *rec,
return -EINVAL;
}
}
+
+ if (evlist__initialize_ctlfd(evlist, opts->ctl_fd, opts->ctl_fd_ack))
+ return -1;
+
+ ret = record__alloc_thread_data(rec, evlist);
+ if (ret)
+ return ret;
+
+ if (record__threads_enabled(rec)) {
+ ret = perf_data__create_dir(&rec->data, evlist->core.nr_mmaps);
+ if (ret) {
+ pr_err("Failed to create data directory: %s\n", strerror(-ret));
+ return ret;
+ }
+ for (i = 0; i < evlist->core.nr_mmaps; i++) {
+ if (evlist->mmap)
+ evlist->mmap[i].file = &rec->data.dir.files[i];
+ if (evlist->overwrite_mmap)
+ evlist->overwrite_mmap[i].file = &rec->data.dir.files[i];
+ }
+ }
+
return 0;
}
@@ -1065,15 +1389,17 @@ static struct perf_event_header finished_round_event = {
static void record__adjust_affinity(struct record *rec, struct mmap *map)
{
if (rec->opts.affinity != PERF_AFFINITY_SYS &&
- !bitmap_equal(rec->affinity_mask.bits, map->affinity_mask.bits,
- rec->affinity_mask.nbits)) {
- bitmap_zero(rec->affinity_mask.bits, rec->affinity_mask.nbits);
- bitmap_or(rec->affinity_mask.bits, rec->affinity_mask.bits,
- map->affinity_mask.bits, rec->affinity_mask.nbits);
- sched_setaffinity(0, MMAP_CPU_MASK_BYTES(&rec->affinity_mask),
- (cpu_set_t *)rec->affinity_mask.bits);
- if (verbose == 2)
- mmap_cpu_mask__scnprintf(&rec->affinity_mask, "thread");
+ !bitmap_equal(thread->mask->affinity.bits, map->affinity_mask.bits,
+ thread->mask->affinity.nbits)) {
+ bitmap_zero(thread->mask->affinity.bits, thread->mask->affinity.nbits);
+ bitmap_or(thread->mask->affinity.bits, thread->mask->affinity.bits,
+ map->affinity_mask.bits, thread->mask->affinity.nbits);
+ sched_setaffinity(0, MMAP_CPU_MASK_BYTES(&thread->mask->affinity),
+ (cpu_set_t *)thread->mask->affinity.bits);
+ if (verbose == 2) {
+ pr_debug("threads[%d]: running on cpu%d: ", thread->tid, sched_getcpu());
+ mmap_cpu_mask__scnprintf(&thread->mask->affinity, "affinity");
+ }
}
}
@@ -1093,17 +1419,26 @@ static size_t process_comp_header(void *record, size_t increment)
return size;
}
-static size_t zstd_compress(struct perf_session *session, void *dst, size_t dst_size,
- void *src, size_t src_size)
+static size_t zstd_compress(struct perf_session *session, struct mmap *map,
+ void *dst, size_t dst_size, void *src, size_t src_size)
{
size_t compressed;
size_t max_record_size = PERF_SAMPLE_MAX_SIZE - sizeof(struct perf_record_compressed) - 1;
+ struct zstd_data *zstd_data = &session->zstd_data;
+
+ if (map && map->file)
+ zstd_data = &map->zstd_data;
- compressed = zstd_compress_stream_to_records(&session->zstd_data, dst, dst_size, src, src_size,
+ compressed = zstd_compress_stream_to_records(zstd_data, dst, dst_size, src, src_size,
max_record_size, process_comp_header);
- session->bytes_transferred += src_size;
- session->bytes_compressed += compressed;
+ if (map && map->file) {
+ thread->bytes_transferred += src_size;
+ thread->bytes_compressed += compressed;
+ } else {
+ session->bytes_transferred += src_size;
+ session->bytes_compressed += compressed;
+ }
return compressed;
}
@@ -1114,14 +1449,17 @@ static int record__mmap_read_evlist(struct record *rec, struct evlist *evlist,
u64 bytes_written = rec->bytes_written;
int i;
int rc = 0;
- struct mmap *maps;
+ int nr_mmaps;
+ struct mmap **maps;
int trace_fd = rec->data.file.fd;
off_t off = 0;
if (!evlist)
return 0;
- maps = overwrite ? evlist->overwrite_mmap : evlist->mmap;
+ nr_mmaps = thread->nr_mmaps;
+ maps = overwrite ? thread->overwrite_maps : thread->maps;
+
if (!maps)
return 0;
@@ -1131,9 +1469,9 @@ static int record__mmap_read_evlist(struct record *rec, struct evlist *evlist,
if (record__aio_enabled(rec))
off = record__aio_get_pos(trace_fd);
- for (i = 0; i < evlist->core.nr_mmaps; i++) {
+ for (i = 0; i < nr_mmaps; i++) {
u64 flush = 0;
- struct mmap *map = &maps[i];
+ struct mmap *map = maps[i];
if (map->core.base) {
record__adjust_affinity(rec, map);
@@ -1175,8 +1513,12 @@ static int record__mmap_read_evlist(struct record *rec, struct evlist *evlist,
/*
* Mark the round finished in case we wrote
* at least one event.
+ *
+ * No need for round events in directory mode,
+ * because per-cpu maps and files have data
+ * sorted by kernel.
*/
- if (bytes_written != rec->bytes_written)
+ if (!record__threads_enabled(rec) && bytes_written != rec->bytes_written)
rc = record__write(rec, NULL, &finished_round_event, sizeof(finished_round_event));
if (overwrite)
@@ -1196,6 +1538,77 @@ static int record__mmap_read_all(struct record *rec, bool synch)
return record__mmap_read_evlist(rec, rec->evlist, true, synch);
}
+static void record__thread_munmap_filtered(struct fdarray *fda, int fd,
+ void *arg __maybe_unused)
+{
+ struct perf_mmap *map = fda->priv[fd].ptr;
+
+ if (map)
+ perf_mmap__put(map);
+}
+
+static void *record__thread(void *arg)
+{
+ enum thread_msg msg = THREAD_MSG__READY;
+ bool terminate = false;
+ struct fdarray *pollfd;
+ int err, ctlfd_pos;
+
+ thread = arg;
+ thread->tid = gettid();
+
+ err = write(thread->pipes.ack[1], &msg, sizeof(msg));
+ if (err == -1)
+ pr_warning("threads[%d]: failed to notify on start: %s\n",
+ thread->tid, strerror(errno));
+
+ pr_debug("threads[%d]: started on cpu%d\n", thread->tid, sched_getcpu());
+
+ pollfd = &thread->pollfd;
+ ctlfd_pos = thread->ctlfd_pos;
+
+ for (;;) {
+ unsigned long long hits = thread->samples;
+
+ if (record__mmap_read_all(thread->rec, false) < 0 || terminate)
+ break;
+
+ if (hits == thread->samples) {
+
+ err = fdarray__poll(pollfd, -1);
+ /*
+ * Propagate error, only if there's any. Ignore positive
+ * number of returned events and interrupt error.
+ */
+ if (err > 0 || (err < 0 && errno == EINTR))
+ err = 0;
+ thread->waking++;
+
+ if (fdarray__filter(pollfd, POLLERR | POLLHUP,
+ record__thread_munmap_filtered, NULL) == 0)
+ break;
+ }
+
+ if (pollfd->entries[ctlfd_pos].revents & POLLHUP) {
+ terminate = true;
+ close(thread->pipes.msg[0]);
+ thread->pipes.msg[0] = -1;
+ pollfd->entries[ctlfd_pos].fd = -1;
+ pollfd->entries[ctlfd_pos].events = 0;
+ }
+
+ pollfd->entries[ctlfd_pos].revents = 0;
+ }
+ record__mmap_read_all(thread->rec, true);
+
+ err = write(thread->pipes.ack[1], &msg, sizeof(msg));
+ if (err == -1)
+ pr_warning("threads[%d]: failed to notify on termination: %s\n",
+ thread->tid, strerror(errno));
+
+ return NULL;
+}
+
static void record__init_features(struct record *rec)
{
struct perf_session *session = rec->session;
@@ -1222,7 +1635,9 @@ static void record__init_features(struct record *rec)
if (!rec->opts.use_clockid)
perf_header__clear_feat(&session->header, HEADER_CLOCK_DATA);
- perf_header__clear_feat(&session->header, HEADER_DIR_FORMAT);
+ if (!record__threads_enabled(rec))
+ perf_header__clear_feat(&session->header, HEADER_DIR_FORMAT);
+
if (!record__comp_enabled(rec))
perf_header__clear_feat(&session->header, HEADER_COMPRESSED);
@@ -1232,6 +1647,7 @@ static void record__init_features(struct record *rec)
static void
record__finish_output(struct record *rec)
{
+ int i;
struct perf_data *data = &rec->data;
int fd = perf_data__fd(data);
@@ -1240,6 +1656,10 @@ record__finish_output(struct record *rec)
rec->session->header.data_size += rec->bytes_written;
data->file.size = lseek(perf_data__fd(data), 0, SEEK_CUR);
+ if (record__threads_enabled(rec)) {
+ for (i = 0; i < data->dir.nr; i++)
+ data->dir.files[i].size = lseek(data->dir.files[i].fd, 0, SEEK_CUR);
+ }
if (!rec->no_buildid) {
process_buildids(rec);
@@ -1461,7 +1881,7 @@ static int record__synthesize(struct record *rec, bool tail)
return err;
}
- err = perf_event__synthesize_cpu_map(&rec->tool, rec->evlist->core.cpus,
+ err = perf_event__synthesize_cpu_map(&rec->tool, rec->evlist->core.user_requested_cpus,
process_synthesized_event, NULL);
if (err < 0) {
pr_err("Couldn't synthesize cpu map.\n");
@@ -1619,11 +2039,129 @@ static void record__uniquify_name(struct record *rec)
}
}
+static int record__terminate_thread(struct record_thread *thread_data)
+{
+ int err;
+ enum thread_msg ack = THREAD_MSG__UNDEFINED;
+ pid_t tid = thread_data->tid;
+
+ close(thread_data->pipes.msg[1]);
+ thread_data->pipes.msg[1] = -1;
+ err = read(thread_data->pipes.ack[0], &ack, sizeof(ack));
+ if (err > 0)
+ pr_debug2("threads[%d]: sent %s\n", tid, thread_msg_tags[ack]);
+ else
+ pr_warning("threads[%d]: failed to receive termination notification from %d\n",
+ thread->tid, tid);
+
+ return 0;
+}
+
+static int record__start_threads(struct record *rec)
+{
+ int t, tt, err, ret = 0, nr_threads = rec->nr_threads;
+ struct record_thread *thread_data = rec->thread_data;
+ sigset_t full, mask;
+ pthread_t handle;
+ pthread_attr_t attrs;
+
+ thread = &thread_data[0];
+
+ if (!record__threads_enabled(rec))
+ return 0;
+
+ sigfillset(&full);
+ if (sigprocmask(SIG_SETMASK, &full, &mask)) {
+ pr_err("Failed to block signals on threads start: %s\n", strerror(errno));
+ return -1;
+ }
+
+ pthread_attr_init(&attrs);
+ pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
+
+ for (t = 1; t < nr_threads; t++) {
+ enum thread_msg msg = THREAD_MSG__UNDEFINED;
+
+#ifdef HAVE_PTHREAD_ATTR_SETAFFINITY_NP
+ pthread_attr_setaffinity_np(&attrs,
+ MMAP_CPU_MASK_BYTES(&(thread_data[t].mask->affinity)),
+ (cpu_set_t *)(thread_data[t].mask->affinity.bits));
+#endif
+ if (pthread_create(&handle, &attrs, record__thread, &thread_data[t])) {
+ for (tt = 1; tt < t; tt++)
+ record__terminate_thread(&thread_data[t]);
+ pr_err("Failed to start threads: %s\n", strerror(errno));
+ ret = -1;
+ goto out_err;
+ }
+
+ err = read(thread_data[t].pipes.ack[0], &msg, sizeof(msg));
+ if (err > 0)
+ pr_debug2("threads[%d]: sent %s\n", rec->thread_data[t].tid,
+ thread_msg_tags[msg]);
+ else
+ pr_warning("threads[%d]: failed to receive start notification from %d\n",
+ thread->tid, rec->thread_data[t].tid);
+ }
+
+ sched_setaffinity(0, MMAP_CPU_MASK_BYTES(&thread->mask->affinity),
+ (cpu_set_t *)thread->mask->affinity.bits);
+
+ pr_debug("threads[%d]: started on cpu%d\n", thread->tid, sched_getcpu());
+
+out_err:
+ pthread_attr_destroy(&attrs);
+
+ if (sigprocmask(SIG_SETMASK, &mask, NULL)) {
+ pr_err("Failed to unblock signals on threads start: %s\n", strerror(errno));
+ ret = -1;
+ }
+
+ return ret;
+}
+
+static int record__stop_threads(struct record *rec)
+{
+ int t;
+ struct record_thread *thread_data = rec->thread_data;
+
+ for (t = 1; t < rec->nr_threads; t++)
+ record__terminate_thread(&thread_data[t]);
+
+ for (t = 0; t < rec->nr_threads; t++) {
+ rec->samples += thread_data[t].samples;
+ if (!record__threads_enabled(rec))
+ continue;
+ rec->session->bytes_transferred += thread_data[t].bytes_transferred;
+ rec->session->bytes_compressed += thread_data[t].bytes_compressed;
+ pr_debug("threads[%d]: samples=%lld, wakes=%ld, ", thread_data[t].tid,
+ thread_data[t].samples, thread_data[t].waking);
+ if (thread_data[t].bytes_transferred && thread_data[t].bytes_compressed)
+ pr_debug("transferred=%" PRIu64 ", compressed=%" PRIu64 "\n",
+ thread_data[t].bytes_transferred, thread_data[t].bytes_compressed);
+ else
+ pr_debug("written=%" PRIu64 "\n", thread_data[t].bytes_written);
+ }
+
+ return 0;
+}
+
+static unsigned long record__waking(struct record *rec)
+{
+ int t;
+ unsigned long waking = 0;
+ struct record_thread *thread_data = rec->thread_data;
+
+ for (t = 0; t < rec->nr_threads; t++)
+ waking += thread_data[t].waking;
+
+ return waking;
+}
+
static int __cmd_record(struct record *rec, int argc, const char **argv)
{
int err;
int status = 0;
- unsigned long waking = 0;
const bool forks = argc > 0;
struct perf_tool *tool = &rec->tool;
struct record_opts *opts = &rec->opts;
@@ -1668,6 +2206,17 @@ static int __cmd_record(struct record *rec, int argc, const char **argv)
return PTR_ERR(session);
}
+ if (record__threads_enabled(rec)) {
+ if (perf_data__is_pipe(&rec->data)) {
+ pr_err("Parallel trace streaming is not available in pipe mode.\n");
+ return -1;
+ }
+ if (rec->opts.full_auxtrace) {
+ pr_err("Parallel trace streaming is not available in AUX area tracing mode.\n");
+ return -1;
+ }
+ }
+
fd = perf_data__fd(data);
rec->session = session;
@@ -1727,7 +2276,7 @@ static int __cmd_record(struct record *rec, int argc, const char **argv)
if (record__open(rec) != 0) {
err = -1;
- goto out_child;
+ goto out_free_threads;
}
session->header.env.comp_mmap_len = session->evlist->core.mmap_len;
@@ -1735,7 +2284,7 @@ static int __cmd_record(struct record *rec, int argc, const char **argv)
err = record__kcore_copy(&session->machines.host, data);
if (err) {
pr_err("ERROR: Failed to copy kcore\n");
- goto out_child;
+ goto out_free_threads;
}
}
@@ -1746,7 +2295,7 @@ static int __cmd_record(struct record *rec, int argc, const char **argv)
bpf__strerror_apply_obj_config(err, errbuf, sizeof(errbuf));
pr_err("ERROR: Apply config to BPF failed: %s\n",
errbuf);
- goto out_child;
+ goto out_free_threads;
}
/*
@@ -1764,11 +2313,11 @@ static int __cmd_record(struct record *rec, int argc, const char **argv)
if (data->is_pipe) {
err = perf_header__write_pipe(fd);
if (err < 0)
- goto out_child;
+ goto out_free_threads;
} else {
err = perf_session__write_header(session, rec->evlist, fd, false);
if (err < 0)
- goto out_child;
+ goto out_free_threads;
}
err = -1;
@@ -1776,16 +2325,16 @@ static int __cmd_record(struct record *rec, int argc, const char **argv)
&& !perf_header__has_feat(&session->header, HEADER_BUILD_ID)) {
pr_err("Couldn't generate buildids. "
"Use --no-buildid to profile anyway.\n");
- goto out_child;
+ goto out_free_threads;
}
err = record__setup_sb_evlist(rec);
if (err)
- goto out_child;
+ goto out_free_threads;
err = record__synthesize(rec, false);
if (err < 0)
- goto out_child;
+ goto out_free_threads;
if (rec->realtime_prio) {
struct sched_param param;
@@ -1794,10 +2343,13 @@ static int __cmd_record(struct record *rec, int argc, const char **argv)
if (sched_setscheduler(0, SCHED_FIFO, &param)) {
pr_err("Could not set realtime priority.\n");
err = -1;
- goto out_child;
+ goto out_free_threads;
}
}
+ if (record__start_threads(rec))
+ goto out_free_threads;
+
/*
* When perf is starting the traced process, all the events
* (apart from group members) have enable_on_exec=1 set,
@@ -1855,9 +2407,6 @@ static int __cmd_record(struct record *rec, int argc, const char **argv)
evlist__start_workload(rec->evlist);
}
- if (evlist__initialize_ctlfd(rec->evlist, opts->ctl_fd, opts->ctl_fd_ack))
- goto out_child;
-
if (opts->initial_delay) {
pr_info(EVLIST_DISABLED_MSG);
if (opts->initial_delay > 0) {
@@ -1871,7 +2420,7 @@ static int __cmd_record(struct record *rec, int argc, const char **argv)
trigger_ready(&switch_output_trigger);
perf_hooks__invoke_record_start();
for (;;) {
- unsigned long long hits = rec->samples;
+ unsigned long long hits = thread->samples;
/*
* rec->evlist->bkw_mmap_state is possible to be
@@ -1925,8 +2474,8 @@ static int __cmd_record(struct record *rec, int argc, const char **argv)
if (!quiet)
fprintf(stderr, "[ perf record: dump data: Woken up %ld times ]\n",
- waking);
- waking = 0;
+ record__waking(rec));
+ thread->waking = 0;
fd = record__switch_output(rec, false);
if (fd < 0) {
pr_err("Failed to switch to new file\n");
@@ -1940,20 +2489,24 @@ static int __cmd_record(struct record *rec, int argc, const char **argv)
alarm(rec->switch_output.time);
}
- if (hits == rec->samples) {
+ if (hits == thread->samples) {
if (done || draining)
break;
- err = evlist__poll(rec->evlist, -1);
+ err = fdarray__poll(&thread->pollfd, -1);
/*
* Propagate error, only if there's any. Ignore positive
* number of returned events and interrupt error.
*/
if (err > 0 || (err < 0 && errno == EINTR))
err = 0;
- waking++;
+ thread->waking++;
- if (evlist__filter_pollfd(rec->evlist, POLLERR | POLLHUP) == 0)
+ if (fdarray__filter(&thread->pollfd, POLLERR | POLLHUP,
+ record__thread_munmap_filtered, NULL) == 0)
draining = true;
+
+ evlist__ctlfd_update(rec->evlist,
+ &thread->pollfd.entries[thread->ctlfd_pos]);
}
if (evlist__ctlfd_process(rec->evlist, &cmd) > 0) {
@@ -2007,14 +2560,18 @@ static int __cmd_record(struct record *rec, int argc, const char **argv)
}
if (!quiet)
- fprintf(stderr, "[ perf record: Woken up %ld times to write data ]\n", waking);
+ fprintf(stderr, "[ perf record: Woken up %ld times to write data ]\n",
+ record__waking(rec));
if (target__none(&rec->opts.target))
record__synthesize_workload(rec, true);
out_child:
- evlist__finalize_ctlfd(rec->evlist);
+ record__stop_threads(rec);
record__mmap_read_all(rec, true);
+out_free_threads:
+ record__free_thread_data(rec);
+ evlist__finalize_ctlfd(rec->evlist);
record__aio_mmap_read_sync(rec);
if (rec->session->bytes_transferred && rec->session->bytes_compressed) {
@@ -2204,6 +2761,78 @@ static int record__parse_affinity(const struct option *opt, const char *str, int
return 0;
}
+static int record__mmap_cpu_mask_alloc(struct mmap_cpu_mask *mask, int nr_bits)
+{
+ mask->nbits = nr_bits;
+ mask->bits = bitmap_zalloc(mask->nbits);
+ if (!mask->bits)
+ return -ENOMEM;
+
+ return 0;
+}
+
+static void record__mmap_cpu_mask_free(struct mmap_cpu_mask *mask)
+{
+ bitmap_free(mask->bits);
+ mask->nbits = 0;
+}
+
+static int record__thread_mask_alloc(struct thread_mask *mask, int nr_bits)
+{
+ int ret;
+
+ ret = record__mmap_cpu_mask_alloc(&mask->maps, nr_bits);
+ if (ret) {
+ mask->affinity.bits = NULL;
+ return ret;
+ }
+
+ ret = record__mmap_cpu_mask_alloc(&mask->affinity, nr_bits);
+ if (ret) {
+ record__mmap_cpu_mask_free(&mask->maps);
+ mask->maps.bits = NULL;
+ }
+
+ return ret;
+}
+
+static void record__thread_mask_free(struct thread_mask *mask)
+{
+ record__mmap_cpu_mask_free(&mask->maps);
+ record__mmap_cpu_mask_free(&mask->affinity);
+}
+
+static int record__parse_threads(const struct option *opt, const char *str, int unset)
+{
+ int s;
+ struct record_opts *opts = opt->value;
+
+ if (unset || !str || !strlen(str)) {
+ opts->threads_spec = THREAD_SPEC__CPU;
+ } else {
+ for (s = 1; s < THREAD_SPEC__MAX; s++) {
+ if (s == THREAD_SPEC__USER) {
+ opts->threads_user_spec = strdup(str);
+ if (!opts->threads_user_spec)
+ return -ENOMEM;
+ opts->threads_spec = THREAD_SPEC__USER;
+ break;
+ }
+ if (!strncasecmp(str, thread_spec_tags[s], strlen(thread_spec_tags[s]))) {
+ opts->threads_spec = s;
+ break;
+ }
+ }
+ }
+
+ if (opts->threads_spec == THREAD_SPEC__USER)
+ pr_debug("threads_spec: %s\n", opts->threads_user_spec);
+ else
+ pr_debug("threads_spec: %s\n", thread_spec_tags[opts->threads_spec]);
+
+ return 0;
+}
+
static int parse_output_max_size(const struct option *opt,
const char *str, int unset)
{
@@ -2328,12 +2957,22 @@ static int switch_output_setup(struct record *rec)
* --switch-output=signal, as we'll send a SIGUSR2 from the side band
* thread to its parent.
*/
- if (rec->switch_output_event_set)
+ if (rec->switch_output_event_set) {
+ if (record__threads_enabled(rec)) {
+ pr_warning("WARNING: --switch-output-event option is not available in parallel streaming mode.\n");
+ return 0;
+ }
goto do_signal;
+ }
if (!s->set)
return 0;
+ if (record__threads_enabled(rec)) {
+ pr_warning("WARNING: --switch-output option is not available in parallel streaming mode.\n");
+ return 0;
+ }
+
if (!strcmp(s->str, "signal")) {
do_signal:
s->signal = true;
@@ -2652,8 +3291,8 @@ static struct option __record_options[] = {
"Set affinity mask of trace reading thread to NUMA node cpu mask or cpu of processed mmap buffer",
record__parse_affinity),
#ifdef HAVE_ZSTD_SUPPORT
- OPT_CALLBACK_OPTARG('z', "compression-level", &record.opts, &comp_level_default,
- "n", "Compressed records using specified level (default: 1 - fastest compression, 22 - greatest compression)",
+ OPT_CALLBACK_OPTARG('z', "compression-level", &record.opts, &comp_level_default, "n",
+ "Compress records using specified level (default: 1 - fastest compression, 22 - greatest compression)",
record__parse_comp_level),
#endif
OPT_CALLBACK(0, "max-size", &record.output_max_size,
@@ -2678,11 +3317,392 @@ static struct option __record_options[] = {
&record.debuginfod.set, "debuginfod urls",
"Enable debuginfod data retrieval from DEBUGINFOD_URLS or specified urls",
"system"),
+ OPT_CALLBACK_OPTARG(0, "threads", &record.opts, NULL, "spec",
+ "write collected trace data into several data files using parallel threads",
+ record__parse_threads),
OPT_END()
};
struct option *record_options = __record_options;
+static void record__mmap_cpu_mask_init(struct mmap_cpu_mask *mask, struct perf_cpu_map *cpus)
+{
+ int c;
+
+ for (c = 0; c < cpus->nr; c++)
+ set_bit(cpus->map[c].cpu, mask->bits);
+}
+
+static int record__mmap_cpu_mask_init_spec(struct mmap_cpu_mask *mask, const char *mask_spec)
+{
+ struct perf_cpu_map *cpus;
+
+ cpus = perf_cpu_map__new(mask_spec);
+ if (!cpus)
+ return -ENOMEM;
+
+ bitmap_zero(mask->bits, mask->nbits);
+ record__mmap_cpu_mask_init(mask, cpus);
+ perf_cpu_map__put(cpus);
+
+ return 0;
+}
+
+static void record__free_thread_masks(struct record *rec, int nr_threads)
+{
+ int t;
+
+ if (rec->thread_masks)
+ for (t = 0; t < nr_threads; t++)
+ record__thread_mask_free(&rec->thread_masks[t]);
+
+ zfree(&rec->thread_masks);
+}
+
+static int record__alloc_thread_masks(struct record *rec, int nr_threads, int nr_bits)
+{
+ int t, ret;
+
+ rec->thread_masks = zalloc(nr_threads * sizeof(*(rec->thread_masks)));
+ if (!rec->thread_masks) {
+ pr_err("Failed to allocate thread masks\n");
+ return -ENOMEM;
+ }
+
+ for (t = 0; t < nr_threads; t++) {
+ ret = record__thread_mask_alloc(&rec->thread_masks[t], nr_bits);
+ if (ret) {
+ pr_err("Failed to allocate thread masks[%d]\n", t);
+ goto out_free;
+ }
+ }
+
+ return 0;
+
+out_free:
+ record__free_thread_masks(rec, nr_threads);
+
+ return ret;
+}
+
+static int record__init_thread_cpu_masks(struct record *rec, struct perf_cpu_map *cpus)
+{
+ int t, ret, nr_cpus = perf_cpu_map__nr(cpus);
+
+ ret = record__alloc_thread_masks(rec, nr_cpus, cpu__max_cpu().cpu);
+ if (ret)
+ return ret;
+
+ rec->nr_threads = nr_cpus;
+ pr_debug("nr_threads: %d\n", rec->nr_threads);
+
+ for (t = 0; t < rec->nr_threads; t++) {
+ set_bit(cpus->map[t].cpu, rec->thread_masks[t].maps.bits);
+ set_bit(cpus->map[t].cpu, rec->thread_masks[t].affinity.bits);
+ if (verbose) {
+ pr_debug("thread_masks[%d]: ", t);
+ mmap_cpu_mask__scnprintf(&rec->thread_masks[t].maps, "maps");
+ pr_debug("thread_masks[%d]: ", t);
+ mmap_cpu_mask__scnprintf(&rec->thread_masks[t].affinity, "affinity");
+ }
+ }
+
+ return 0;
+}
+
+static int record__init_thread_masks_spec(struct record *rec, struct perf_cpu_map *cpus,
+ const char **maps_spec, const char **affinity_spec,
+ u32 nr_spec)
+{
+ u32 s;
+ int ret = 0, t = 0;
+ struct mmap_cpu_mask cpus_mask;
+ struct thread_mask thread_mask, full_mask, *thread_masks;
+
+ ret = record__mmap_cpu_mask_alloc(&cpus_mask, cpu__max_cpu().cpu);
+ if (ret) {
+ pr_err("Failed to allocate CPUs mask\n");
+ return ret;
+ }
+ record__mmap_cpu_mask_init(&cpus_mask, cpus);
+
+ ret = record__thread_mask_alloc(&full_mask, cpu__max_cpu().cpu);
+ if (ret) {
+ pr_err("Failed to allocate full mask\n");
+ goto out_free_cpu_mask;
+ }
+
+ ret = record__thread_mask_alloc(&thread_mask, cpu__max_cpu().cpu);
+ if (ret) {
+ pr_err("Failed to allocate thread mask\n");
+ goto out_free_full_and_cpu_masks;
+ }
+
+ for (s = 0; s < nr_spec; s++) {
+ ret = record__mmap_cpu_mask_init_spec(&thread_mask.maps, maps_spec[s]);
+ if (ret) {
+ pr_err("Failed to initialize maps thread mask\n");
+ goto out_free;
+ }
+ ret = record__mmap_cpu_mask_init_spec(&thread_mask.affinity, affinity_spec[s]);
+ if (ret) {
+ pr_err("Failed to initialize affinity thread mask\n");
+ goto out_free;
+ }
+
+ /* ignore invalid CPUs but do not allow empty masks */
+ if (!bitmap_and(thread_mask.maps.bits, thread_mask.maps.bits,
+ cpus_mask.bits, thread_mask.maps.nbits)) {
+ pr_err("Empty maps mask: %s\n", maps_spec[s]);
+ ret = -EINVAL;
+ goto out_free;
+ }
+ if (!bitmap_and(thread_mask.affinity.bits, thread_mask.affinity.bits,
+ cpus_mask.bits, thread_mask.affinity.nbits)) {
+ pr_err("Empty affinity mask: %s\n", affinity_spec[s]);
+ ret = -EINVAL;
+ goto out_free;
+ }
+
+ /* do not allow intersection with other masks (full_mask) */
+ if (bitmap_intersects(thread_mask.maps.bits, full_mask.maps.bits,
+ thread_mask.maps.nbits)) {
+ pr_err("Intersecting maps mask: %s\n", maps_spec[s]);
+ ret = -EINVAL;
+ goto out_free;
+ }
+ if (bitmap_intersects(thread_mask.affinity.bits, full_mask.affinity.bits,
+ thread_mask.affinity.nbits)) {
+ pr_err("Intersecting affinity mask: %s\n", affinity_spec[s]);
+ ret = -EINVAL;
+ goto out_free;
+ }
+
+ bitmap_or(full_mask.maps.bits, full_mask.maps.bits,
+ thread_mask.maps.bits, full_mask.maps.nbits);
+ bitmap_or(full_mask.affinity.bits, full_mask.affinity.bits,
+ thread_mask.affinity.bits, full_mask.maps.nbits);
+
+ thread_masks = realloc(rec->thread_masks, (t + 1) * sizeof(struct thread_mask));
+ if (!thread_masks) {
+ pr_err("Failed to reallocate thread masks\n");
+ ret = -ENOMEM;
+ goto out_free;
+ }
+ rec->thread_masks = thread_masks;
+ rec->thread_masks[t] = thread_mask;
+ if (verbose) {
+ pr_debug("thread_masks[%d]: ", t);
+ mmap_cpu_mask__scnprintf(&rec->thread_masks[t].maps, "maps");
+ pr_debug("thread_masks[%d]: ", t);
+ mmap_cpu_mask__scnprintf(&rec->thread_masks[t].affinity, "affinity");
+ }
+ t++;
+ ret = record__thread_mask_alloc(&thread_mask, cpu__max_cpu().cpu);
+ if (ret) {
+ pr_err("Failed to allocate thread mask\n");
+ goto out_free_full_and_cpu_masks;
+ }
+ }
+ rec->nr_threads = t;
+ pr_debug("nr_threads: %d\n", rec->nr_threads);
+ if (!rec->nr_threads)
+ ret = -EINVAL;
+
+out_free:
+ record__thread_mask_free(&thread_mask);
+out_free_full_and_cpu_masks:
+ record__thread_mask_free(&full_mask);
+out_free_cpu_mask:
+ record__mmap_cpu_mask_free(&cpus_mask);
+
+ return ret;
+}
+
+static int record__init_thread_core_masks(struct record *rec, struct perf_cpu_map *cpus)
+{
+ int ret;
+ struct cpu_topology *topo;
+
+ topo = cpu_topology__new();
+ if (!topo) {
+ pr_err("Failed to allocate CPU topology\n");
+ return -ENOMEM;
+ }
+
+ ret = record__init_thread_masks_spec(rec, cpus, topo->core_cpus_list,
+ topo->core_cpus_list, topo->core_cpus_lists);
+ cpu_topology__delete(topo);
+
+ return ret;
+}
+
+static int record__init_thread_package_masks(struct record *rec, struct perf_cpu_map *cpus)
+{
+ int ret;
+ struct cpu_topology *topo;
+
+ topo = cpu_topology__new();
+ if (!topo) {
+ pr_err("Failed to allocate CPU topology\n");
+ return -ENOMEM;
+ }
+
+ ret = record__init_thread_masks_spec(rec, cpus, topo->package_cpus_list,
+ topo->package_cpus_list, topo->package_cpus_lists);
+ cpu_topology__delete(topo);
+
+ return ret;
+}
+
+static int record__init_thread_numa_masks(struct record *rec, struct perf_cpu_map *cpus)
+{
+ u32 s;
+ int ret;
+ const char **spec;
+ struct numa_topology *topo;
+
+ topo = numa_topology__new();
+ if (!topo) {
+ pr_err("Failed to allocate NUMA topology\n");
+ return -ENOMEM;
+ }
+
+ spec = zalloc(topo->nr * sizeof(char *));
+ if (!spec) {
+ pr_err("Failed to allocate NUMA spec\n");
+ ret = -ENOMEM;
+ goto out_delete_topo;
+ }
+ for (s = 0; s < topo->nr; s++)
+ spec[s] = topo->nodes[s].cpus;
+
+ ret = record__init_thread_masks_spec(rec, cpus, spec, spec, topo->nr);
+
+ zfree(&spec);
+
+out_delete_topo:
+ numa_topology__delete(topo);
+
+ return ret;
+}
+
+static int record__init_thread_user_masks(struct record *rec, struct perf_cpu_map *cpus)
+{
+ int t, ret;
+ u32 s, nr_spec = 0;
+ char **maps_spec = NULL, **affinity_spec = NULL, **tmp_spec;
+ char *user_spec, *spec, *spec_ptr, *mask, *mask_ptr, *dup_mask = NULL;
+
+ for (t = 0, user_spec = (char *)rec->opts.threads_user_spec; ; t++, user_spec = NULL) {
+ spec = strtok_r(user_spec, ":", &spec_ptr);
+ if (spec == NULL)
+ break;
+ pr_debug2("threads_spec[%d]: %s\n", t, spec);
+ mask = strtok_r(spec, "/", &mask_ptr);
+ if (mask == NULL)
+ break;
+ pr_debug2(" maps mask: %s\n", mask);
+ tmp_spec = realloc(maps_spec, (nr_spec + 1) * sizeof(char *));
+ if (!tmp_spec) {
+ pr_err("Failed to reallocate maps spec\n");
+ ret = -ENOMEM;
+ goto out_free;
+ }
+ maps_spec = tmp_spec;
+ maps_spec[nr_spec] = dup_mask = strdup(mask);
+ if (!maps_spec[nr_spec]) {
+ pr_err("Failed to allocate maps spec[%d]\n", nr_spec);
+ ret = -ENOMEM;
+ goto out_free;
+ }
+ mask = strtok_r(NULL, "/", &mask_ptr);
+ if (mask == NULL) {
+ pr_err("Invalid thread maps or affinity specs\n");
+ ret = -EINVAL;
+ goto out_free;
+ }
+ pr_debug2(" affinity mask: %s\n", mask);
+ tmp_spec = realloc(affinity_spec, (nr_spec + 1) * sizeof(char *));
+ if (!tmp_spec) {
+ pr_err("Failed to reallocate affinity spec\n");
+ ret = -ENOMEM;
+ goto out_free;
+ }
+ affinity_spec = tmp_spec;
+ affinity_spec[nr_spec] = strdup(mask);
+ if (!affinity_spec[nr_spec]) {
+ pr_err("Failed to allocate affinity spec[%d]\n", nr_spec);
+ ret = -ENOMEM;
+ goto out_free;
+ }
+ dup_mask = NULL;
+ nr_spec++;
+ }
+
+ ret = record__init_thread_masks_spec(rec, cpus, (const char **)maps_spec,
+ (const char **)affinity_spec, nr_spec);
+
+out_free:
+ free(dup_mask);
+ for (s = 0; s < nr_spec; s++) {
+ if (maps_spec)
+ free(maps_spec[s]);
+ if (affinity_spec)
+ free(affinity_spec[s]);
+ }
+ free(affinity_spec);
+ free(maps_spec);
+
+ return ret;
+}
+
+static int record__init_thread_default_masks(struct record *rec, struct perf_cpu_map *cpus)
+{
+ int ret;
+
+ ret = record__alloc_thread_masks(rec, 1, cpu__max_cpu().cpu);
+ if (ret)
+ return ret;
+
+ record__mmap_cpu_mask_init(&rec->thread_masks->maps, cpus);
+
+ rec->nr_threads = 1;
+
+ return 0;
+}
+
+static int record__init_thread_masks(struct record *rec)
+{
+ int ret = 0;
+ struct perf_cpu_map *cpus = rec->evlist->core.user_requested_cpus;
+
+ if (!record__threads_enabled(rec))
+ return record__init_thread_default_masks(rec, cpus);
+
+ switch (rec->opts.threads_spec) {
+ case THREAD_SPEC__CPU:
+ ret = record__init_thread_cpu_masks(rec, cpus);
+ break;
+ case THREAD_SPEC__CORE:
+ ret = record__init_thread_core_masks(rec, cpus);
+ break;
+ case THREAD_SPEC__PACKAGE:
+ ret = record__init_thread_package_masks(rec, cpus);
+ break;
+ case THREAD_SPEC__NUMA:
+ ret = record__init_thread_numa_masks(rec, cpus);
+ break;
+ case THREAD_SPEC__USER:
+ ret = record__init_thread_user_masks(rec, cpus);
+ break;
+ default:
+ break;
+ }
+
+ return ret;
+}
+
int cmd_record(int argc, const char **argv)
{
int err;
@@ -2764,9 +3784,20 @@ int cmd_record(int argc, const char **argv)
goto out_opts;
}
- if (rec->opts.kcore)
+ if (rec->opts.kcore || record__threads_enabled(rec))
rec->data.is_dir = true;
+ if (record__threads_enabled(rec)) {
+ if (rec->opts.affinity != PERF_AFFINITY_SYS) {
+ pr_err("--affinity option is mutually exclusive to parallel streaming mode.\n");
+ goto out_opts;
+ }
+ if (record__aio_enabled(rec)) {
+ pr_err("Asynchronous streaming mode (--aio) is mutually exclusive to parallel streaming mode.\n");
+ goto out_opts;
+ }
+ }
+
if (rec->opts.comp_level != 0) {
pr_debug("Compression enabled, disabling build id collection at the end of the session.\n");
rec->no_buildid = true;
@@ -2800,6 +3831,11 @@ int cmd_record(int argc, const char **argv)
}
}
+ if (rec->timestamp_filename && record__threads_enabled(rec)) {
+ rec->timestamp_filename = false;
+ pr_warning("WARNING: --timestamp-filename option is not available in parallel streaming mode.\n");
+ }
+
/*
* Allow aliases to facilitate the lookup of symbols for address
* filters. Refer to auxtrace_parse_filters().
@@ -2808,17 +3844,6 @@ int cmd_record(int argc, const char **argv)
symbol__init(NULL);
- if (rec->opts.affinity != PERF_AFFINITY_SYS) {
- rec->affinity_mask.nbits = cpu__max_cpu().cpu;
- rec->affinity_mask.bits = bitmap_zalloc(rec->affinity_mask.nbits);
- if (!rec->affinity_mask.bits) {
- pr_err("Failed to allocate thread mask for %zd cpus\n", rec->affinity_mask.nbits);
- err = -ENOMEM;
- goto out_opts;
- }
- pr_debug2("thread mask[%zd]: empty\n", rec->affinity_mask.nbits);
- }
-
err = record__auxtrace_init(rec);
if (err)
goto out;
@@ -2948,6 +3973,12 @@ int cmd_record(int argc, const char **argv)
goto out;
}
+ err = record__init_thread_masks(rec);
+ if (err) {
+ pr_err("Failed to initialize parallel data streaming masks\n");
+ goto out;
+ }
+
if (rec->opts.nr_cblocks > nr_cblocks_max)
rec->opts.nr_cblocks = nr_cblocks_max;
pr_debug("nr_cblocks: %d\n", rec->opts.nr_cblocks);
@@ -2961,11 +3992,12 @@ int cmd_record(int argc, const char **argv)
err = __cmd_record(&record, argc, argv);
out:
- bitmap_free(rec->affinity_mask.bits);
evlist__delete(rec->evlist);
symbol__exit();
auxtrace_record__free(rec->itr);
out_opts:
+ record__free_thread_masks(rec, rec->nr_threads);
+ rec->nr_threads = 0;
evlist__close_control(rec->opts.ctl_fd, rec->opts.ctl_fd_ack, &rec->opts.ctl_fd_close);
return err;
}