CSAPP-优化程序性能

编程/技术 2019-03-28 @ 08:20:48 浏览数: 364 净访问: 248 By: skyrover

本博客采用创作共用版权协议, 要求署名、非商业用途和保持一致. 转载本博客文章必须也遵循署名-非商业用途-保持一致的创作共用协议


个人认为对一个程序员帮助最大的是算法,其次是操作系统

前言

  • 如何提高程序性能?
    1. 合适的算法和数据结构
    2. 编写出编译器能够有效优化以转换成高效可执行代码的源代码
    3. 将一个任务分成多个部分,这些部分可以在多核和多处理器的某种组合上并行的计算。
  • 如何进行优化
    1. 程序优化第一步是消除不必要的内容,消除不必要的函数调用,条件测试和存储器引用。
    2. 了解处理器的运作,利用处理器提供的指令级并行能力,同时执行多条指令。

5.1 优化编译器的能力和局限性

  1. 存储器别名使用
    void twiddle1(int *xp, int*yp){
         *xp += *yp;
         *xp += *yp;
     }
    
  2. 函数调用
    int counter = 0;
     int f(){
         return counter++;
     }
     int func1(){
         return f() + f();
     }
     int func2(){
         return 2*f();
     }
    
    主要原因是函数可能有副作用-修改了全局程序状态的一部分。

使用内联函数替换优化函数调用

int func1in(){
    int t = counter++;
    t += counter++;
    return t
}
// 进一步优化
int func1opt(){
    int t = 2 * counter + 1;
    counter += 2
}

5.2 表示程序性能

每元素的周期数(CPE),下面程序2的CPE优于程序1的CPE

void psum1(float a[], float p[], long int n){
    long int i;
    p[0] = a[0];
    for (i = 1; i < n; i++)
        p[i] = p[i-1] + a[i];
}

void psum2(float a[], float p[], long int n){
    long int i;
    p[0] = a[0];
    for (i = 1; i < n - 1; i+=2){
        float mid_val = p[i-1] = a[i];
        p[i] = mid_val;
        p[i+1] = mid_val + a[i+1];
    }
    if (i < n){
        p[i] = p[i-1] + a[i];
    }
}

5.3 程序示例

typedef struct{
    long int len;
    data_t *data;
} vec_rec, *vec_ptr;

其中 vec_rec 为struct的别名,*vec_ptrstruct *的别名。一般会加上结构名,如果结构体内部需要引用该结构体的话:

typedef struct VEC_REC{
    long int len;
    data_t *data;
    struct VEC_REC *pNext;
} vec_rec, *vec_ptr;

测试代码:

vec_ptr new_vec(long int len){
    vec_ptr result = (vec_ptr) malloc(sizeof(vec_rec));
    if (!result){
        return NULL;
    }
    result->len = len;
    if (len > 0){
        data_t *data = (data_t *)calloc(len, sizeof(data_t));
        if (!data){
            free((void *) result);
            return NULL;
        }
        result->data = data;
    }else{
        result->data = NULL;
    }
    return result;
}

int get_vec_element(vec_ptr v, long int index, data_t *dest){
    if (index < 0 || index >= v->len){
        returrn 0;
    }
    *dest = v->data[index];
    return 1;
}

long int vec_length(vec_ptr v){
    return v->len;
}

// 合并运算
void combine1(vec_ptr v, data_t *dest){
    long int i;
    *dest = IDENT;
    for (i=0;i<vec_length(v);i++){
        data_t val;
        get_vec_element(v, i, &val);
        *dest = *dest OP val;
    }
}

5.4 消除循环的低效率

每次循环计算length是没有必要的,而且是低效的。所以可以改成下面的:

void combine2(vec_ptr v, data_t *dest){
    long int i;
    long int length = vec_length(v);
    *dest = INDENT;
    for (i=0;i<length;i++){
        data_t val;
        get_vec_element(v, i, &val);
        *dest = *dest OP val;
    }
}

这类优化被称为:代码移动,识别要多次计算但是计算结果不会变,可以将计算移到代码前面不会被多噢次求值的地方。

5.5 减少过程调用

data_t *get_vec_start(vec_ptr v){
    return v->data;
}

void combine3(vec_ptr v, data_t *dest){
    long int i;
    long int length = vec_length(v);
    data_t *data = get_vec_start(v);
    *dest = INDENT;
    for (i=0;i<lenght;i++){
        *dest = *dest OP data[i];
    }
}

没有函数调用来获取向量元素,直接访问数组,但是这样可能会破坏程序的模块性。所以折衷方案是为了速度,损害一些模块性和抽象性,但是增加文档。

5.6 消除不必要的存储器引用

前面的函数代码产生的汇编代码如下:

.L498:
    movss (%rbp), %xmm0
    mulss (%rax, %rdx, 4), %xmm0
    movss %xmm0, (%rbp)
    addq $1, %rdx
    cmpq %rdx, %r12
    jg .L498

对应指针dest的地址在%rbp中,所以上面这段汇编执行了从%rbp取出dest的值,然后做乘法,再写回去。至于为什么编译器不能做优化的原因是,编译器没法确定传进来的参数地址到底是引用哪里的,能否做这样的优化,但是程序员可以显式的消除这样无用的存储器读写,将函数改为:

void combine4(vec_ptr v, data_t *dest){
    long int i;
    long int length = vec_length(v);
    data_t *data = get_vec_start(v);
    data_t acc = IDENT;
    for (i=0;i<length;i++){
        acc = acc OP data[i];
    }
    *dest = acc;
}

5.7 理解现代处理器

考虑利用处理器微体系结构的优化,也就是处理器用来执行指令的底层系统设计。指令级并行:多条指令可以并行的执行,同时又呈现一种简单的顺序执行指令的表象。

  • 延迟界限:当一系列的操作必须按照严格顺序执行的时候就会遇到延迟界限
  • 吞吐量界限:刻画了处理器功能单元的原始计算能力
  1. 整体操作

CPU整个设计有两个主要部分:指令控制单元(ICU)和执行单元(EU),这块讲了分支预测(可能失败),指令译码(接收实际程序指令,将其转换为一组基本操作),退役单元(记录正在进行的处理,任何对程序状态的更新只会在指令退役时候才会发生),这块大概了解下。。并不能完全的理解。

  1. 功能单元的性能
  • 延迟:表示完成运算所需要的总时间,加法可以在一个指令周期完成,并且硬件有三个完全流水线化的能够执行整数加法的功能单元。
  • 发射时间:表示两个连续的同类型计算之间需要最小时钟周期数
  • 最大吞吐量:发射时间的倒数

CPE(每元素的周期数)值的两个界限:

  • 延迟:任何必须按照严格顺序完成合并运算的函数所需要的最小CPE值
  • 吞吐量:根据功能单元产生结果的最大速率,吞吐量界限给出了CPE的最小界限
  1. 处理器操作的抽象模型

这一小节讲的不错,按照上面给的combine4函数的循环部分,给出了每次迭代会使用什么寄存器,进行什么操作,然后更新什么寄存器。比如对于这样的汇编

.L488:
    mulss (%rax, %rdx, 4), %xmm()
    addq $1, %rdx
    cmpq %rdx, %rbp
    jg .L488

会访问%rax, %rbp, %rdx, %xmm()寄存器,进行load, mul, add, cmp, jg操作,最后更新的寄存器只有%rdx, %xmm(),分别表示更新循环变量i和累积变量acc

寄存器可以被分为4类:

  • 只读:比如%rax, %rbp
  • 只写
  • 局部:比如条件码寄存器
  • 循环:比如%rdx, %xmm()

得到了抽象数据流图后,就可以对限制程序性能的关键路径进行分析了。对于浮点数乘法来说,有两个数据相关链,分别对应于乘法对acc的修改和加法对循环变量i的修改,单精度乘法延迟为4个周期,整数加法延迟为1个周期,所以乘法就成为关键路径,那么我们就得到了4个周期的延迟界限的CPE。

对于整数加法情况,实际情况和用关键路径预测的不一致,因为是关键路径只是提供程序需要周期数的下界,可能还有其他因素限制性能,包括可用的功能单元数量和任何一步中功能单元之间能够传递数据值的数量。

所以基本上延迟界限是基本的限制,那么下来就是要重新调整操作结构,增强指令的并行性!

5.8 循环展开

循环展开就是在每次迭代的时候增加计算的元素数量,减少循环的迭代次数,就和上面的函数一样。这种方法有两个好处:

  1. 减少了循环索引计算和条件分支的判断
  2. 并且可以在此增加一些方法来进一步变化代码,减少整个计算中关键路径的操作数量

对于使用累计变量优化的combine4函数,可以做如下的优化

void combine5(vec_ptr v, data_t *dest){
    long int i;
    long int length = vec_length(v);
    long int limit = length - 1;
    data_t *data = get_vec_start(v);
    data_t acc = IDENT;
    for (i=0;i<limit;i+=2){
        acc = (acc OP data[i]) OP data[i+1];
    }
    for(;i<length;i++){
        acc = acc OP data[i];
    }
    *dest = acc;
}

在这里使用了2次展开,首先要确保第一次循环不会越界,也就是i+1<n,那么就是i<n-1,在第一个循环结束的时候,可能还有剩余元素,因此需要第二个循环将剩余元素进行处理。归纳到k次展开,那么上限就是n-k+1,在第一个循环对元素ii+k-1应用,然后在后面循环以同样的处理方式进行处理。

通过这样的循环展开技术,对于整数加法和乘法,CPE有所改进,原因是减少了循环开销操作,但是对于浮点数操作没有变化。对于整数乘法有比较大的改变是因为编译器是基于重关联变换做优化的,也就是在acc = (acc OP data[i]) OP data[i+1];这里,会进行这样的优化acc = acc OP (data[i] OP data[i+1]);,这样做的好处在于可以使得指令并行化,每个迭代内的第一个乘法都不需要等待前一次迭代的累计值就可以执行,也就是每次迭代中关键路径只有一个操作。而浮点数乘法不会做这样的优化原因是这种重新结合可能对结果产生影响。

5.9 提高并行性

前面可以看出性能瓶颈在于只有一个累计变量,因此每次迭代必须等待前一个迭代的累计变量成功赋值,所以在这里可以增加累计变量。最简单的就是设置两个累计变量,按照奇偶以此累计。下面是两次展开,两路并行的代码:

void combine6(vec_ptr v, data_t *data){
    long int i;
    long int length = vec_length(v);
    long int limit = length - 1;
    data_t *data = get_vec_start(v);
    data_t acc0 = IDENT;
    data_t acc1 = IDENT;
    for(i=0;i<limit;i+=2){
        acc0 = acc0 OP data[i];
        acc1 = acc1 OP data[i+1];
    }
    for(;i<length;i++){
        acc0 = acc0 OP data[i];
    }
    *dest = acc0 OP acc1;
}

采用这种变换,所有操作CPE都有改进,直到达到最优。具体数据流图可以参考书上。

最后我们得到结论,循环展开和并行的累计到多个值里,是提高程序性能的更可靠的方法。

5.11 一些限制因素

程序数据流图是挺有用,也很直观,在这里关键路径指明了执行该程序所需要时间的一个基本下界。如果程序中有某条数据相关链,该条链上的所有延迟之和等于T,那么这个程序至少需要T个周期才能执行完。

功能单元的吞吐量界限也是程序执行时间的一个下届。假设一个程序需要N种运算的计算,而微处理器只有m个能执行这个操作的功能单元,并且这些单元的发射时间(连续同类运算之间所需要的最小时钟周期数)为i,那么这个程序的执行至少需要N*i/m个周期

  • 寄存器溢出

循环并行性可以提高性能,但是过多的局部变量将无法全都存在寄存器中,比如在IA32指令集中,只有少量的寄存器来存放累计值,如果并行度p超过了可用寄存器数量,那么编译器就会诉诸溢出,将某些临时值存放在栈中,这样就会导致性能的急剧降低。

  • 分支预测和预测错误的惩罚

这里分支预测如果不能正确预测,可能会有严重的预测错误惩罚。不仅要丢弃所有投机执行的结果,在正确的位置,重新开始读取指令的过程,在产生有用的结果之前,必须重新填充指令流水线。

有两个通用原则来降低这一问题的影响:

  1. 不要过分关心可以预测的分支
  2. 书写适合用条件传送实现的代码

第一点不必多说,因为可以预测,所以只有在最后一次会惩罚,基本不会影响太多性能。

第二点中,最近的x86处理器有条件传送指令,编译条件语句和表达式的时候,GCC能产生使用这些指令的代码,基本思想是计算出一个条件表达式或者语句的两个方向的值,然后用条件传送选择期望的值。

下面是一个例子,第一个是普通条件控制语句实现的

int absdiff(int x, int y){
    return (x < y ? y - x : x -y);
}

下面这个是使用条件传送:

int cmovdiff(int x, int y){
    int tval = y-x;
    int rval = x-y;
    int test = x < y; //可以看到这里计算了条件的结果和两种执行路径的最后结果并重新命名


    if(test)
    rval = tval;
    return rval;
}

翻译为汇编语言:

movl 8(%ebp), %ecx
movl 12(%ebp), %edx
movl %edx,%ebx
subl %ecx,%ebx
movl %ecx,%eax
subl %edx, %eax
cmpl %edx,%ecx // 这里和上面的代码有着同样的操作计算两种路径的结果 先求值后验证
cmovl %ebx,%eax // 条件转移指令

与条件跳转不同,处理器可以执行条件传送,而无需预测测试的结果。处理器只是读源值(可能是从存储器中),检查条件码,然后要么更新目的寄存器,要么保持不变。

5.12 理解存储器性能

  • 加载的性能

比如下面的汇编:

.L11:
    addl $1, %eax
    movq (%rdi), %rdi
    testq %rdi, %rdi
    jne .L11

这里movq就是循环中的关键瓶颈,寄存器%rdi的每个后继值都依赖于加载操作的结果,加载操作以%rdi中的值作为它的地址。

  • 存储的性能

一系列存储操作不会产生数据相关,只有加载操作是受到存储操作结果影响的。

这样一个函数,如果srcdest是指向同一个地方的指针,那么其性能就远小于指向不同地方:

void write_read(int *src, int *dest, int n){
    int cnt = n;
    int val = 0;
    while (cnt--){
        *dest = val;
        val = (*src) + 1;
    }
}

5.13 性能提高技术

总结一下:

  1. 高级设计:算法和数据结构
  2. 基本编码原则
    1. 消除连续函数调用,将计算移到循环外
    2. 消除不必要的存储器引用,引入临时变量来保存中间结果
  3. 低级优化
    1. 展开循环
    2. 多个累计变量和重新结合技术
    3. 功能的风格重写条件操作

就我自己而言,我觉得最重要的是算法,写出好的算法,高效的算法提升是最大,方法也是最简单的。其次基本编码原则是一个好的习惯,需要保持。对于低级优化来说,写高级语言可能帮助不是那么大。

5.14 确认和消除性能瓶颈

  1. Amdahl 定律

简要来说:当我们加快系统一个部分的速度时候,对系统整体性能的影响依赖于这个部分有多重要和速度提升了多少。要大幅度提高整个系统的速度,必须提高整个系统很大一部分的速度。所以通过只改进系统一部分获得性能收益,收益既依赖于我们对这个部分的提高程度,也依赖于这个部分原来在整个时间中占的比例。

Python 程序性能分析

cProfile

使用工具cProfile,在一个脚本中写好要测试的程序,然后在终端中运行python -m cProfile -s tottime calc_efficiency.py表示对单个函数的运行总时间进行统计,这样就会在终端上打印出来性能分析结果,比如这样:

1723 function calls (1578 primitive calls) in 0.004 seconds

Ordered by: internal time

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
   4    0.000    0.000    0.000    0.000 _methods.py:86(_var)
  13    0.000    0.000    0.000    0.000 calculators.py:18(nan2zero)
   2    0.000    0.000    0.000    0.000 financial.py:25(_adjust_returns)
   2    0.000    0.000    0.000    0.000 {method 'cumprod' of 'numpy.ndarray' objects}
   2    0.000    0.000    0.000    0.000 {method 'accumulate' of 'numpy.ufunc' objects}

表示1723个函数调用被监控,其中1578是原生调用,即不涉及递归调用,总共执行时间是0.004秒,结果列表是按照函数内部调用时间来排序,即不包括他自己调用的其他函数的时间。

  • ncalls 表示调用次数,如果是4/3 表示4次总调用次数,3表示原生调用次数
  • tottime 表示函数内部调用时间
  • percall 是tottime/ncalls
  • cumtime 累计时间,包含了自己内部调用函数的时间
  • 最后一列,文件名,行号,函数名

cProfile除了可以在终端上使用,还有编程接口cProfile.run('re.compile("foo|bar")')

具体cProfile的使用方法请戳这里

另外官方给出的使用pstats模块的Stats类来读取和操作stats文件,Analysis of the profiler data is done using the Stats class.

一个使用示例:

import cProfile
import pstats
from ability import WholeAbility
import time

profile = cProfile.Profile()
profile.enable()
t0 = time.time()

# 测试部分 无关紧要
fundId = "test"
start = "20151021"
end = "20151023"
wa = WholeAbility(fundId, start, end, assets, flows, stocks)
ability_data, positions, trades = wa.calc_ability()
summary_data = wa.summary()
print("using {}".format(time.time()-t0))
# 测试部分 无关紧要

profile.disable()
profile.print_stats(sort='tottime')
sortby = "tottime"
ps = pstats.Stats(profile).sort_stats(sortby)
ps.dump_stats("./eff.txt")

使用dump_stats可以把当前性能分析结果写入文件,这个结果可以在后面进行可视化分析。

gprof2dot 分析结果可视化
  • 安装

  • Mac: brew install graphviz

  • Ubuntu: apt-get install python graphviz

  • 下载

pip install gprof2dot

  • 使用

然后就可以使用gprof2dot来调取之前存储的分析结果来生成可视化数据了,相当好用,利用这张图,就可以看出你的程序哪里是性能瓶颈了,然后有目的的去优化,再分析!

gprof2dot -f pstats mkm_run.prof | dot -Tpng -o mkm_run.png

其中每个node的信息如下:

+------------------------------+
|        function name         |
| total time % ( self time % ) |
|         total calls          |
+------------------------------+

每个edge的信息如下:

total time %
   calls
parent --------------------> children
  • 备注
  1. 尽量不要使用deepcopy,很慢,详见这篇文章蜗牛般的 Python 深拷贝
  2. 优化算法,不要在循环里面进行耗时操作


点赞走一波😏


评论

提交评论