一、概述
线性流水线与非线性流水线是CPU中指令处理流水线的一种分类标准。线性流水线很好理解,就是一条路走到黑的流水线;非线性流水线则不同,它可能存在前馈与反馈,每个部件可能使用一次或多次,它就没法像线性流水线那么一个一个部件按部就班的走。因此出现了一个问题,如果我第一个任务第二次使用部件A,第二个任务恰好第一次也使用部件A,这会怎么样?
出现矛盾了,流水线卡住了。这不好,因此需要流水线调度算法来安排好每一个任务,在让它们不冲突的同时,最大可能提高流水线的效率。
二、分析
1、非线性流水线的描述
线性流水线能够用流水线连接图唯一表示,非线性流水线则不行,它的描述需要流水线连接图和预约表的协同作用。一个流水线连接图可能有好几种执行顺序,也就对应好几个可能的预约表,一个预约表也可能对应好几张流水线连接图。只有同时看这两者,才能确定最终的处理顺序。下图就是一张预约表。
这是一个有四个组件,七个流水段的流水线。横轴代表时间,纵轴代表组件。×则代表在某一时刻当前使用的组件。因此我们可以得知,每个框中只能有一个×,否则会出现冲突。
2、调度算法的推导
首先,我们要提出三个定义:
启动距离、禁止向量、冲突向量。
启动距离指的是第一个任务进入流水线后,第二个任务进入时,不发生冲突的时间。显然,启动距离又长又短,我们的目的就是找出平均启动距离最短的一个任务载入序列。
禁止向量指的是预约表中每一行任意两个×之间距离的集合。例如上图中,禁止向量为(2,4,6)。
冲突向量的指的是如下一个向量:(Cm,Cm-1......C2C1),其中C为0或1;m为禁止向量中的最大值。对于Ci,如果禁止向量在i中,则Ci=1,否则Ci=0。
上图预约表对应的冲突向量指的是(101010)。
我们可以这样理解冲突向量:对于一个任务x,其耗时为nk,n为流水段数量,k为每一流水段耗费的时间。假设各流水段好费时间相同。那么,在x进入后的第k,2k...nk时刻,均可以加载下一个任务y。然而可能出现冲突。可以通过计算冲突向量来规避这种冲突:
若冲突向量的Cm=1,则x进入后的第mk时刻不能加载y,否则会冲突。如图,C1=0,C2=1。如果在x进入后2时刻,即横轴为3的时候加载y,那么在横轴为5的时候,x与y争用S3部件,出现冲突。也就是说,y可以在C1、C3、C5也就是横轴为2、4、6的时候载入。
现在我们假设y在C3载入,也就是在横轴为4处载入。对于y来说,在它没结束之前,有哪些时刻不能载入任务呢?很容易想的一点是:在C2、C4、C6不能载入。因为y是和x一样的任务,因此x中不能载入的时刻,y也不能载入。但这够了么?还不够,x还会对y之后的任务产生影响,如何表达这种影响呢?用x的冲突向量来表达。
接着拿y在C3载入举例,这时,从x的冲突向量可以知道,横轴为3、5、7的时候不能载入,从y的冲突向量可以知道,横轴为6、8、10的时候无法载入。做一下并集,在3、5、6、7、8、10的时候不能载入。由于y在4的时候载入,因此仅需要注意5、6、7、8、10即可。由5、6、7、8、10可以得出y的冲突向量101111。这是我们手动算出来的。如何通过数学方法计算呢?
通过右移与按位取并操作进行。
如上图,右移3代表选择x的非冲突时刻C3,右移3得到的000101代表对于y来说,x的影响,101010是y自身的影响。二者叠加可得所有不能载入的时刻。
再如上图,初始任务x的冲突向量为1010100,第二个任务y若选择在2时间后载入,那么x对其的影响可以写为0010101;两个影响叠加为1010101,与最初的影响相同;第二个任务y若选择在1时间后载入,那么x对其的影响为0101010,两个影响合并为1111110,这是y的冲突向量;第三个任务z若选择在y后1时间载入,那么y对其的影响为0111111,两个影响合并为1111111。
也可以将1111111看成是xyz三个任务共同影响的结果,因为1111111是x的冲突向量右移两位并上y右移一位并上z得到的。
如此,我们需要得到所有冲突向量右移所有的0的位置得到的所有的结果,然后按位取并,继续右移继续取并,直到结果与之前的相同。这样我们就会得到一张图。
上图是101010所得到的的图。这其中的所有循环,就是我们要找的非冲突的载入序列。也就是我们算法最后需要的结果。
三、代码实现
由1、2可以得知,算法的整体思路可以按如下几步:
第一,输入预约表;
第二,得到初始冲突向量;
第三,生成状态图;
第四,找到所有循环。
其中,第三和第四步可以转化为如下问题:生成一张有向图,记录其中所有循环;但是我的算法可以按另一种形式描述:生成一棵树,当根节点对应的叶子节点与之前的节点相同是,记录之前的节点到根节点的路径。
以下是我的代码。
1、输入函数
为便于输入,选择从外部文件读取预约表,函数如下:
int readTable(int a,int b)
{
FILE*fp = NULL;//需要注意
fp = fopen(F_PATH, "r");
if (NULL == fp) return -1;//要返回错误代码
for (int i = 0; i
< b; j++)
{
int tmp;
fscanf(fp,"%d", &tmp);
ResTable[i][j] = tmp;
}
fclose(fp);
fp = NULL;//需要指向空,否则会指向原打开文件地址
return 0;
}
使用fopen读入文件流,用fp储存文件流,然后生成预约表对应的二维数组即可。
2、得到初始冲突向量
为得到冲突向量,首先要得到禁止向量。由于禁止向量中没有重复元素,选择使用set保存。然后遍历各行,计算出预约表中不为零的元素对应的列坐标的差的绝对值,保存在dis中。
然后遍历dis,使用string生成冲突向量,dis中的值作为string中元素下标,这些下标对应的值为1,其余为0。从而生成初始冲突向量。
void Create_Initial_Conflict_Vector(int a,int b)
{
set
dis;//不为零之间的距离
for (int i = 0; i
tmp;
for (int j = 0; j < b; j++)
{
if (ResTable[i][j] != 0)
{
tmp.push_back(j);
}
for (int m = 0; m < tmp.size(); m++)
{
for (int n = m + 1; n < tmp.size(); n++)
{
dis.insert(abs(tmp[m] - tmp[n]));
}
}
}
}
//printSet(dis);
for (int i = 1; i < b; i++)
{
if (dis.find(i) != dis.end())
Initial_Conflict_Vector = '1' + Initial_Conflict_Vector;
else
Initial_Conflict_Vector = '0' + Initial_Conflict_Vector;
}
}
3、生成状态图并找出所有循环
重头戏,算法的精髓就在这里。我选择使用递归生成状态图并保存循环。
首先需要写两个工具函数,string右移以及string按位取并。如下:
string StringRightMove(string s, int k)
{
while (k != 0)
{
s = '0' + s;
s.pop_back();
k--;
}
return s;
}
string StringAnd(string s1, string s2)
{
for (int i = 0; i < s1.size(); i++)
{
if (s1[i] == '0'&&s2[i] == '0')
s1[i] = '0';
else
s1[i] = '1';
}
return s1;
}
然后开始写递归函数。代码如下:
void Find_real_Circle(string ConVector, unordered_set
CVTable, vector
Now_Start_Cycle, int len)
{
if (CVTable.find(ConVector) != CVTable.end())
{
StartCycle.push_back(Now_Start_Cycle);
unordered_set
::iterator it = CVTable.begin();
int numhead = 0;
while (*it != ConVector)
{
it++;
numhead++;
}
vector
RealCycle(Now_Start_Cycle.begin() + numhead, Now_Start_Cycle.end());
real_StartCycle.push_back(RealCycle);
//CVTable.erase(ConVector);
return;
}
else
{
CVTable.insert(ConVector);
int num_1 = 0;
for (int i = len - 1; i >= 0; i--)
{
if (ConVector[i] == '0')
{
string tmpVector, tmpResult;
tmpVector = StringRightMove(ConVector, len - i);
tmpResult = StringAnd(tmpVector, Initial_Conflict_Vector);
Now_Start_Cycle.push_back(len - i);
Find_real_Circle(tmpResult, CVTable, Now_Start_Cycle, len);
Now_Start_Cycle.pop_back();
}
else
num_1++;
}
Now_Start_Cycle.push_back(len + 1);
//VectorPrint(Now_Start_Cycle);
real_StartCycle.push_back(Now_Start_Cycle);
StartCycle.push_back(Now_Start_Cycle);
return;
}
}
参数如下:
string ConVector:上一任务的冲突向量
unordered_set
vector
int len:冲突向量长度
首先,要设计递归出口:什么时候退出递归?当然是右移取并得到的结果发现之前已经得到过,这时候退出递归,即找到了一个循环。如何发现本次得到的结果之前已经得到过?用set可以,但是考虑到存在如下的循环:
42222222,循环体为2,但是要先经过4才能开始循环。因此只用set只能“发现循环”,而不能“发现循环体”。这不好,unordered_set更好:一旦我在unordered_set中发现结果,由于unordered_set相比于set保留了压入时的顺序,通过定位到结果的位置,那么结果到unordered_set结尾便是循环体。
使用全局变量vector
然后设计递归体。对于递归体,我们要对ConVector的所有为0的位都照顾到,即所有为0的位都要右移然后取并得到新的冲突向量。另外,进入递归体说明还没找到循环,那么当前的冲突向量要保存在CVTable中,当前右移的值要保存在Now_Start_Cycle中。在保存并形成新的冲突向量后,继续调用函数开始递归。
在所有的0都右移之后,要将len+1保存在Now_Start_Cycle中,因为所有载入的任务在len+1之后都必定可以开始下一个任务而不产生冲突。
这样就得到了所有循环。
4、输出结果
这就是按部就班的写就可以了。没什么好说的。
for (int i = 0; i < StartCycle.size(); i++)
{
cout << "初始循环为:";
for (int j = 0; j < StartCycle[i].size(); j++)
{
cout << StartCycle[i][j] << ' ';
}
cout << "循环体为:";
for (int j = 0; j < real_StartCycle[i].size(); j++)
{
cout << real_StartCycle[i][j] << ' ';
}
cout << '\n';
}
int try_num = 0;
while (try_num<1)
{
cout << "请选择循环序号:\n";
int Cycle_x;
cin >> Cycle_x;
int Display_table[10][200] = { 0 };
int CycleNum[10] = { 0 };
int len1, len2;
len1 = StartCycle[Cycle_x].size();
len2 = real_StartCycle[Cycle_x].size();
CycleNum[0] = StartCycle[Cycle_x][0];
for (int i = 1; i < len1; i++)
CycleNum[i] = CycleNum[i - 1] + StartCycle[Cycle_x][i];
for (int i = len1; i
< b; j++)
{
int p = 1;
if (ResTable[i][j] != 0)
{
Display_table[i][j] = p;
for (int k = 0; k < len1 + len2 * 2; k++)
{
Display_table[i][j + CycleNum[k]] = p + k + 1;
}
}
}
printTable(a, CycleNum[len1 + len2 * 2 - 1], Display_table);
try_num++;
}
5、效果
6、流水线生成图片
由于使用C++生成图片太过困难,而利用python的matplotlib包生成图片又很简单。因此我不得不写了另外一个python版本。
python版本使用了numpy和matplotlib两个常用库。其算法思想与C++如出一辙,唯一要注意的就是在迭代的时候的深拷贝与浅拷贝的问题,迭代中许多变量要使用深拷贝,否则会出错,这里很难通过debug得出。
效果如下:
效果比只有0和1还是好不少的。
四、总结
在实现方面,调度算法的核心就在于查找循环,查找循环的核心就是递归算法。递归算法写好,整个问题便迎刃而解。
在理论方面,理解冲突向量是如何生成的是关键,而这需要理解冲突向量的含义,理解其含义,配合预约表中多个任务走一遍流程,就可以知道算法的原理了。
PS:代码如下:
C++版:
#include
#include
#include
#include
#include
python版:
import matplotlib.pyplot as plt
import pylab
import pandas as pd
import copy
StartCycle=list()
RealStartCycle=list()
InitialConflictVector=str()
def Create_Initial_Conflict_Vector(a:int ,b:int ,Table:list)->str:
dis=set()
for i in range(a):
tmp=[]
for j in range(b):
if (Table[i][j]==1):
tmp.append(j)
for m in range(0,len(tmp)):
for n in range(m+1,len(tmp)):
dis.add(abs(tmp[m]-tmp[n]))
result=str()
for i in range(1,b):
if i in dis:
result='1'+result
else:
result='0'+result
return result
def String_Right_Move(s:str, k:int)->str:
while (k != 0):
s = '0' + s
s = s[:-1]
k = k-1
return s
def String_And(s1:str,s2:str):
s=str()
for i in range(len(s1)):
if s1[i] == '0'and s2[i] == '0':
s = s + '0';
else:
s = s + '1';
return s;
def Find_Real_Circle(ConVector:str, CVTable:set, CVVector:list, NowStartCycle:list, lenth:int)->None:
if ConVector in CVTable:
StartCycle.append(copy.deepcopy(NowStartCycle))
numhead=0
while(ConVector!=CVVector[numhead]):
numhead=numhead+1
RealStartCycle.append(copy.deepcopy(NowStartCycle[numhead:len(NowStartCycle)]))
CVTable.remove(ConVector)
CVVector.pop()
return
else:
CVTable.add(copy.deepcopy(ConVector))
CVVector.append(copy.deepcopy(ConVector))
for i in range((lenth-1),-1,-1):
if (ConVector[i]=='0'):
TmpVector = String_Right_Move(ConVector,lenth-i)
TmpResult = String_And(TmpVector,ConVector)
NowStartCycle.append(lenth-i)
Find_Real_Circle(TmpResult, copy.deepcopy(CVTable), copy.deepcopy(CVVector), NowStartCycle, lenth)
NowStartCycle.pop()
NowStartCycle.append(lenth+1)
RealStartCycle.append(copy.deepcopy(NowStartCycle))
StartCycle.append(copy.deepcopy(NowStartCycle))
NowStartCycle.pop()
return
def Create_Cycle_Using_List(Cycle1:list,Cycle2:list,kth:int)->list:
CycleUsingList=list()
CycleUsingList.append(Cycle1[kth][0])
for i in range(1,len(Cycle1[kth])):
CycleUsingList.append(CycleUsingList[i-1]+Cycle1[kth][i])
for i in range(len(Cycle2[kth])):
CycleUsingList.append(CycleUsingList[len(Cycle1[kth])+i-1]+Cycle2[kth][i])
for i in range(len(Cycle2[kth])):
CycleUsingList.append(CycleUsingList[len(Cycle1[kth])+len(Cycle2[kth])+i-1]+Cycle2[kth][i])
for i in range(len(Cycle2[kth])):
CycleUsingList.append(CycleUsingList[len(Cycle1[kth])+len(Cycle2[kth])*2+i-1]+Cycle2[kth][i])
return CycleUsingList
def Create_Matrix(InitialMatrix:list,CycleList:list,a:int)->list:
Resultlist=[[0 for col in range(CycleList[len(CycleList)-1]+len(InitialMatrix[0]))] for row in range(a)]
TmpList=[0]*len(CycleList)
for i in range(a):
for j in range(len(InitialMatrix[0])):
if InitialMatrix[i][j]==1:
Resultlist[i][j]=1
for k in range(1,len(CycleList)):
Resultlist[i][j+CycleList[k]]=1+k
return Resultlist
#预约表
ResTable_DataFrame=pd.read_csv(r'D:\LeetCode\Nonlinear Pipelining Python\file.csv',header=None)
ResTable_list=ResTable_DataFrame.values.tolist()
ResTable=tuple(ResTable_list)
a=int(input("请输入流水线的功能部件数量:"))
b=int(input("请输入流水段数量:"))
InitialConflictVector=Create_Initial_Conflict_Vector(a,b,ResTable)
s1="初始冲突向量为: %s"%(InitialConflictVector)
print(s1)
RealCVTable=set()
RealCVVector=list()
RealNowStartCycle=list()
Find_Real_Circle(InitialConflictVector,RealCVTable,RealCVVector,RealNowStartCycle,b-1)
for i in range(len(StartCycle)):
s2 = "进入循环的序列为: %s 循环体为: %s" %(StartCycle[i],RealStartCycle[i])
print(s2)
for i in range(3):
n=int(input("请选择查看第几个循环:"))
CycleUsingList=Create_Cycle_Using_List(StartCycle,RealStartCycle,n)
CycleUsingMatrix=Create_Matrix(ResTable_list,CycleUsingList,a)
plt.imshow(CycleUsingMatrix, interpolation='nearest')
pylab.show()