summaryrefslogtreecommitdiffstats
path: root/net/ceph/osdmap.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/ceph/osdmap.c')
-rw-r--r--net/ceph/osdmap.c101
1 files changed, 71 insertions, 30 deletions
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index d2436880b305..6824c0ec8373 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -153,6 +153,32 @@ bad:
return -EINVAL;
}
+static void crush_finalize(struct crush_map *c)
+{
+ __s32 b;
+
+ /* Space for the array of pointers to per-bucket workspace */
+ c->working_size = sizeof(struct crush_work) +
+ c->max_buckets * sizeof(struct crush_work_bucket *);
+
+ for (b = 0; b < c->max_buckets; b++) {
+ if (!c->buckets[b])
+ continue;
+
+ switch (c->buckets[b]->alg) {
+ default:
+ /*
+ * The base case, permutation variables and
+ * the pointer to the permutation array.
+ */
+ c->working_size += sizeof(struct crush_work_bucket);
+ break;
+ }
+ /* Every bucket has a permutation array. */
+ c->working_size += c->buckets[b]->size * sizeof(__u32);
+ }
+}
+
static struct crush_map *crush_decode(void *pbyval, void *end)
{
struct crush_map *c;
@@ -246,10 +272,6 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
b->items = kcalloc(b->size, sizeof(__s32), GFP_NOFS);
if (b->items == NULL)
goto badmem;
- b->perm = kcalloc(b->size, sizeof(u32), GFP_NOFS);
- if (b->perm == NULL)
- goto badmem;
- b->perm_n = 0;
ceph_decode_need(p, end, b->size*sizeof(u32), bad);
for (j = 0; j < b->size; j++)
@@ -368,6 +390,8 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
dout("crush decode tunable chooseleaf_stable = %d\n",
c->chooseleaf_stable);
+ crush_finalize(c);
+
done:
dout("crush_decode success\n");
return c;
@@ -719,7 +743,7 @@ struct ceph_osdmap *ceph_osdmap_alloc(void)
map->pool_max = -1;
map->pg_temp = RB_ROOT;
map->primary_temp = RB_ROOT;
- mutex_init(&map->crush_scratch_mutex);
+ mutex_init(&map->crush_workspace_mutex);
return map;
}
@@ -753,6 +777,7 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map)
kfree(map->osd_weight);
kfree(map->osd_addr);
kfree(map->osd_primary_affinity);
+ kfree(map->crush_workspace);
kfree(map);
}
@@ -808,6 +833,31 @@ static int osdmap_set_max_osd(struct ceph_osdmap *map, int max)
return 0;
}
+static int osdmap_set_crush(struct ceph_osdmap *map, struct crush_map *crush)
+{
+ void *workspace;
+ size_t work_size;
+
+ if (IS_ERR(crush))
+ return PTR_ERR(crush);
+
+ work_size = crush_work_size(crush, CEPH_PG_MAX_SIZE);
+ dout("%s work_size %zu bytes\n", __func__, work_size);
+ workspace = kmalloc(work_size, GFP_NOIO);
+ if (!workspace) {
+ crush_destroy(crush);
+ return -ENOMEM;
+ }
+ crush_init_workspace(crush, workspace);
+
+ if (map->crush)
+ crush_destroy(map->crush);
+ kfree(map->crush_workspace);
+ map->crush = crush;
+ map->crush_workspace = workspace;
+ return 0;
+}
+
#define OSDMAP_WRAPPER_COMPAT_VER 7
#define OSDMAP_CLIENT_DATA_COMPAT_VER 1
@@ -1214,13 +1264,9 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map)
/* crush */
ceph_decode_32_safe(p, end, len, e_inval);
- map->crush = crush_decode(*p, min(*p + len, end));
- if (IS_ERR(map->crush)) {
- err = PTR_ERR(map->crush);
- map->crush = NULL;
+ err = osdmap_set_crush(map, crush_decode(*p, min(*p + len, end)));
+ if (err)
goto bad;
- }
- *p += len;
/* ignore the rest */
*p = end;
@@ -1375,7 +1421,6 @@ e_inval:
struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
struct ceph_osdmap *map)
{
- struct crush_map *newcrush = NULL;
struct ceph_fsid fsid;
u32 epoch = 0;
struct ceph_timespec modified;
@@ -1414,12 +1459,10 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
/* new crush? */
ceph_decode_32_safe(p, end, len, e_inval);
if (len > 0) {
- newcrush = crush_decode(*p, min(*p+len, end));
- if (IS_ERR(newcrush)) {
- err = PTR_ERR(newcrush);
- newcrush = NULL;
+ err = osdmap_set_crush(map,
+ crush_decode(*p, min(*p + len, end)));
+ if (err)
goto bad;
- }
*p += len;
}
@@ -1439,12 +1482,6 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
map->epoch++;
map->modified = modified;
- if (newcrush) {
- if (map->crush)
- crush_destroy(map->crush);
- map->crush = newcrush;
- newcrush = NULL;
- }
/* new_pools */
err = decode_new_pools(p, end, map);
@@ -1505,8 +1542,6 @@ bad:
print_hex_dump(KERN_DEBUG, "osdmap: ",
DUMP_PREFIX_OFFSET, 16, 1,
start, end - start, true);
- if (newcrush)
- crush_destroy(newcrush);
return ERR_PTR(err);
}
@@ -1942,10 +1977,10 @@ static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
BUG_ON(result_max > CEPH_PG_MAX_SIZE);
- mutex_lock(&map->crush_scratch_mutex);
+ mutex_lock(&map->crush_workspace_mutex);
r = crush_do_rule(map->crush, ruleno, x, result, result_max,
- weight, weight_max, map->crush_scratch_ary);
- mutex_unlock(&map->crush_scratch_mutex);
+ weight, weight_max, map->crush_workspace);
+ mutex_unlock(&map->crush_workspace_mutex);
return r;
}
@@ -1978,8 +2013,14 @@ static void pg_to_raw_osds(struct ceph_osdmap *osdmap,
return;
}
- len = do_crush(osdmap, ruleno, pps, raw->osds,
- min_t(int, pi->size, ARRAY_SIZE(raw->osds)),
+ if (pi->size > ARRAY_SIZE(raw->osds)) {
+ pr_err_ratelimited("pool %lld ruleset %d type %d too wide: size %d > %zu\n",
+ pi->id, pi->crush_ruleset, pi->type, pi->size,
+ ARRAY_SIZE(raw->osds));
+ return;
+ }
+
+ len = do_crush(osdmap, ruleno, pps, raw->osds, pi->size,
osdmap->osd_weight, osdmap->max_osd);
if (len < 0) {
pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n",