moongen学习

学习了一下MoonGen,主要参考官网和tutorial。

usage introduction

Device setup

master函数是必须的,用来进行设备的配置,一个简单的设置如下

1
2
3
4
5
6
7
8
9
10
11
12
local device = require "device"
function master(txNum)
-- use specified NIC number with
-- no listening and one transmission queues
txDev = device.config{
port = txNum,
rxQueues = 0,
txQueues = 1,
}
device.waitForLinks()
send(txDev:getTxQueue(0))
end

如果要用到receive filter configuration,要在创建的时候设置设备。在device.config中,设置rssNQueues = N,来创建N个队列。此外,还可以通过rssFunctions来控制hash值,选项有RSS_FUNCTION_IPV4.._IPV6.._IPVX_TCP.._IPVX_UDP。示例如下:

1
2
3
4
5
6
7
8
9
10
local device = require "device"
txDev = device.config{
port = 0,
rxQueues = 4,
rssNQueues = 4,
rssFunctions = {
device.RSS_FUNCTION_IPV4,
device.RSS_FUNCTION_IPV4_TCP,
}
}

packet generation

packets是异步发送、在memory buffer中生成的,所以需要一个在memory pool中的buffer array。

因为大多数packets应该是一样的,所以一般通过一个函数定义。

注意:应该在memory pool中定义packet的值,而不是在generation loop中,否则性能会大大下降。

最简单的方法就是在char buffer中手动设置一个ethernet header,示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
local dpdk = require "dpdk"
local memory = require "memory"
function send(queue)
local mem = memory.createMemPool(function(buf)
local data = ffi.cast("uint8_t*", buf.pkt.data)
for i = 0, 11 do
data[i] = i -- fill in mac addresses
end
data[12] = 0x12 -- set type to ethernet
data[13] = 0x34
end)
local bufs = mem:bufArray()
while dpdk.running()
bufs:alloc(60) -- size of each packet
-- ⇑ sets up each packet with the function above
-- ← here, single packets could be modified
queue:send(bufs) -- schedule sending
end
end

简单一点,raw buffer可以先被转换成各种包的格式,然后直接填就可以了。在lua/include/proto/中可以找到定义的相应格式。比如:

1
2
3
4
5
6
7
local mem = memory.createMemPool(function(buf)
buf:getEthernetPacket():fill{
ethSrc = txDev, -- use device mac
ethDst = "00:01:02:03:04:05",
ethType = 0x1234,
}
end)

如果要定义一种新的格式,按照lua/include/proto/newProtocolTemplate.lua的方式,拷贝并修改。

如果要修改某一种特定的包,在分配buffer后和enqueue之前修改,方法类似。

如果需要控制发包器的速度,通过调用函数queue:setRate(MBit/s)

如果需要暂停LuaJIT VM,通过dpdk.sleepMillis(time)

running parallel tasks

为了支持并行,通过引入dpdk.lauchLua("funcname", arg0, ...)来用一个新的LuaJIT VM跑一个新的slave task。一般的调用方法为

1
2
3
local dpdk = require "dpdk"
dpdk.launchLua("funcname", arg0, ...)
dpdk.waitForSlaves() -- wait for child termination

statistics

stats模块提供了统计功能,收集完数据之后,可以被写到stdout或者一些其他的文件中。支持的输出类型有plaincsvini

packet counter是和device绑定的,要创建一个新的packet counter,可以使用rxCtr = stats:newDevRxCounter(device, "plain")或者newDevTxCounter(..)

newPktRxCounter("your counter name", "plain")或者newPktTxCounter(..)可以通过调用它的countPacket(singleBuffer)来将packet buffer传递给它们来实现更新。

newManualTxCounter("your counter name", "plain")可以手动计数。可以通过updateWithSize(packet_count, each_size)来更新。

对于所有counter,update()方法都可以频繁调用,可以显示当前数据。

histogram模块可以收集频率方面的数据。

下面的示例使用了device和package两种计数器,packet size记在histogram里。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
local dpdk = require "dpdk"
local device = require "device"
local histogram = require "histogram"
local memory = require "memory"
local stats = require "stats"
function master(rxPort, saveInterval)
local saveInterval = saveInterval or 60
local rxDev = device.config{
port = rxPort,
dropEnable = false,
}
device.waitForLinks()
local queue = rxDev:getRxQueue(0)
local bufs = memory.bufArray()
-- create the device receive counter
local rxCtr = stats:newDevRxCounter(queue.dev)
-- and the packet receive counter to detect
-- packets that were dropped on the NICNIC
local pktCtr = stats:newPktRxCounter("pkts", "plain")
local hist = histogram:create()
local timer = timer:new(saveInterval)
while dpdk.running() do
-- wait max 100ms for new data
local rx = queue:tryRecv(bufs, 100)
for i = 1, rx do
local buf = bufs[i]
local size = buf:getSize()
hist:update(size)
pktCtr:countPacket(buf)
end
bufs:free(rx)
rxCtr:update()
pktCtr:update()
if timer:expired() then
timer:reset()
hist:print()
hist:save("packet_sizes.csv")
end
end
-- and print statistics, those should be the same.
rxCtr:finalize()
pktCtr:finalize()
end

如果要使用manual counter,可以通过queue:send()的返回值来实时更新。注意到发送动作是异步的,但是返回值仍然可以用来做计数。和之前的一样,finalize()函数可以用来输出最终结果。而updateWithSize()输出每一秒的实时结果。示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
function send(queue)
local mem = ...
local packetSize = 250
-- create manual counter
local txCtr = stats:newManualTxCounter(port, "plain")
local bufs = mem:bufArray()
while dpdk.running() do
bufs:alloc(packetSize)
bufs:offloadUdpChecksums()
local sentCount = queue:send(bufs)
-- register new data: sentCount * packetSize
txCtr:updateWithSize(sentCount, packetSize)
end
txCtr:finalize()
end

同时也可以在接收包的时候,可以对包的一些特定内容进行统计。示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
function recv(queue)
local bufs = memory.bufArray()
local counters = {}
while dpdk.running() do
-- block until some data was received
local rx = queue:recv(bufs)
for i = 1, rx do
local buf = bufs[i]
-- cast the buffer to a known protocol
local port = buf:getUdpPacket().udp:getDstPort()
local ctr = counters[port]
-- create counters dynamically
if not ctr then
ctr = stats:newPktRxCounter(port, "plain")
counters[port] = ctr
end
-- record the packet
ctr:countPacket(buf)
end
bufs:freeAll()
end
-- for each observed destination port, print stats:
for _, ctr in pairs(counters) do
ctr:finalize()
end
end

timestamping

用来测时间戳的包定义在lua/include/proto/ptp.lua。可以通过timestamping:newTimestamper(txq, rxq)来创造一个新的timestamper。创建的timestamper可以在l2层使用,或者也可以通过udp传输。

以下的例子在两个queue之间每隔0.01s测量一次延迟。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
local ts = require "timestamping"
function timerTask(txq, rxq, size)
-- create the timestamper for measuring
-- between those queues
local timestamper = ts:newTimestamper(txq, rxq)
local hist = histogram:new()
local rateLimiter = timer:new(0.01)
while dpdk.running() do
rateLimiter:reset()
hist:update(timestamper:measureLatency(size))
rateLimiter:busyWait()
end
hist:print()
hist:save("histogram.csv")
end

task communication

moongen提供两种交流的方法。

一种是pipes。一般在一个公共背景下创建pipe,比如master函数。以下就是一个示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
local pipe = require "pipe"
-- create a new pipe in the parent task
local p = pipe:newSlowPipe()
p:send(0, 13, 37, 42) -- send array
p:send("the cake is a lie") -- send string
-- or send a table
p:send({235, lol = "rofl", subtable = {1}})
-- number of waiting messages
local enqueued = p:count()
-- receiving
local a, b, c, d = p:recv() -- equals tryRecv(10)
local txt = p:tryRecv(100) -- wait time microseconds
-- and return answer

-- pass this pipe when creating another task
-- it can then access it like above.
dpdk.launchLua("somefunnction", p)
dpdk.waitForSlaves()

另一种是namespace,就是在LuaJIT VM之间声明用lua table表示的全局变量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
local dpdk = require "dpdk"
local namespaces = require "namespaces"
function master()
local space = namespaces:get("mine")
space.string = "data!"
space.answer = 42
space.table = { black = "mesa", { 1 } }
dpdk.launchLua("slave"):wait()
end
function slave()
-- can access the same namespace!
local slavespace = namespaces:get("mine")
print("data? " .. slavespace.string)
end

traffic patterns

有的时候网卡只支持固定速度,而却需要人为降速,这个时候moongen会通过在包之间添加gap(就是在空隙发送损坏的包)来达到降速的效果。

可以通过buf.setDelay(bytes)来设置延时,如果要设置随机延时,有泊松分布的延时产生poissonDelay(average_wait)。示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
function send(txDev)
local mem = ...
local bufs = mem:bufArray()
while dpdk.running() do
bufs:alloc(size)
for _, buf in ipairs(bufs) do
local avg = rateToByteDelay(rate, size)
local delay = poissonDelay(avg)
buf:setDelay(delay)
end
queue:sendWithDelay(bufs)
end
end