第三届阿里天池数据库大赛 - adb赛道攻略

简介

今年暑假参加了阿里天池举办的第三届数据库大赛,精力主要放在了“高性能分析型查询引擎赛道”,最终排名第九。该赛道中,选手需要用 Java 设计实现一个包含 quantile 分析函数的程序,导入指定的数据,数据均为正的64位整型。导入数据后需回答若干次 quantile 查询。quantile 函数的定义为:

String quantile(String column, double p)

column为查询列,p为百分比,范围[0,1]。函数应返回将列的所有值排序后的第 N * p 个值。如果 N * p 不为整数,则向上取整。查询前,测评程序还会调用一个用户可以自己定义的load函数对数据提前加载。

这次比赛是阿里云联合英特尔傲腾持久内存(PMem)举办的。测评时,数据文件会存储在一块 intel pmem 上,用户也可以在一块pmem上自行读写文件。pmem是通过挂载到文件系统里给用户使用到,用户不能使用第三方库对pmem进行读写。

最后分别在线上预赛和复赛取得了第八名和第九名的成绩。预赛和复赛的差别还是比较大的:预赛提供的是一台4核4G内存的机器,数据文件仅有一个,包含1亿行共两列数据,测评时会进行10次查询;复赛则提供了一台8核8G的机器,数据文件各有两个,每个文件包含两列数据,数据量为10亿行,总计四列20亿行数据。测评最开始会查询10次,接着把程序kill掉,再次启动后用8个线程并发查询4000次。

这些差别使得我们在初赛和复赛完全采用了不同的架构和实现方法,本文将主要介绍复赛代码的核心思路。

整体架构

比赛提供的数据文件大小总计至少有70G,将文本转为64位整数后至少也有30G,同时程序还会被kill。这意味我们不可能把所有数据全部load进内存里,而应当将数据按一定规律分割后刷写进持久化存储介质中再进行查询。pmem的读写速度高于一般的存储介质,这使得I/O不再成为系统的瓶颈,如何利用好CPU资源才是比赛制胜的关键。

为了完成查询,我们的程序需要完成如下核心流程:

  • 读文件:将源文件的数据读出来
  • 数据解析:读出来的数据全部都是不定长的字符,需要将字符解析为整数
  • 分桶:为了能把数据放到内存中进行查询,需要借助 Radix Sort 的思想将数据分割到一个个比较小的桶里,查询时根据桶大小的前缀和确定所查数据在哪个桶里
  • 查询:根据用户输入,通过前缀和确定所查数据在哪个桶里,对桶中数据用快速选择算法确定答案

为了利用好IO和CPU资源,这些流程之间还应当是异步的。在尝试过几种方案后,我们最终采用了双缓冲队列来实现异步读写。

文件读写

本题中,我们的程序是在JDK8下编译运行,且不得调用第三方库。根据英特尔的官方文档 Java* Support for Intel® Optane™ DC Persistent Memory,这种情况下应使用DirectByteBuffer + FileChannel.map 操作pmem,底层其实就是通过 mmap 和 memcpy 等系统调用读写文件。

实际测试中我们发现,这种方法的读性能还算可以,但写性能极差。8线程并发写入30G数据就至少需要70s的时间,这么大的开销是无法接受的。

进一步的测试发现,在对齐到16KB的情况下,使用 FileChannel 本身对文件进行读写要比上述方法快不少。为了方便地将文件中的数据读入堆外内存、把堆外内存数据写入文件,我们直接用反射将 native 的读写方法取出,稍作封装后就应用于实际的文件操作中。这么做也能避免执行标准库中一些不必要的逻辑。

当然,在实际生产环境中这么做有比较大的风险,也不符合一般的编码规范,仅作比赛时使用。


public class UnsafeUtil {

    public static final Unsafe UNSAFE;

    private static Method pwrite0;

    private static Method pread0;

    static {
        try {
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);
            UNSAFE = (Unsafe) field.get(null);

            // get write/read method
            Class<?> fdp = Class.forName("sun.nio.ch.FileDispatcherImpl");
            pwrite0 = getMethod(fdp, "pwrite0", FileDescriptor.class, long.class, int.class, long.class);
            pread0 = getMethod(fdp, "pread0", FileDescriptor.class, long.class, int.class, long.class);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    // Bundle reflection calls to get access to the given method
	private static Method getMethod(Class<?> cls, String name, Class<?>... params) throws Exception {
		Method m = cls.getDeclaredMethod(name, params);
		m.setAccessible(true);
		return m;
	}

    public static void pwrite(FileDescriptor fd, long address, int len, long position) {
        try {
            pwrite0.invoke(null, fd, address, len, position);
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    public static void pread(FileDescriptor fd, long address, int len, long position) {
        try {
            pread0.invoke(null, fd, address, len, position);
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
    }
}

比赛中自始至终令我们疑惑的是,厂商说过,即使不能使用 pmdk,使用 mmap 的效率还是要比 read/write 要高的,是不是我们的 mmap 使用方法出了问题?看了决赛答辩其他选手的方案才明白,问题出在 Java 的内存回收上。JVM里mmap的内存释放是由 JVM 自己控制的,最好的方法是要异步释放 mmap 申请的内存,同时 mmap 的内存也要进行分块以增加 locality。

数据解析

读取数据时,我们实际读到的是一连串不定长的、被逗号和换行符分割的文本数字,例如:

8910346109626313592,655566368837043341
8503816627163320049,2082353146547479655
1267292120702928131,6459882484077174740
207450898312,8798338643394196680

最简单的方法是按行将这些数据作为字符串读入,再按逗号和换行符进行分割,解析分割后的字符串为long。众所周知,字符串操作的开销是比较大的,为了避免字符串操作,最开始我们简单地通过字符的值推导出long值:

long val = 0;
for (int i = 0; i < size; i++) {
    if (readBuffer[i] == ',') {
        // successfully read a number belongs to cloumn 1
        val = 0;
        break;
    } else if (readBuffer[i] == '\n') {
        // successfully read a number belongs to cloumn 2
        val = 0;
        break;
    } else {
        val = val * 10 + (readBuffer[i] - '0');
    }
}

接下来测试时发现,代码中if语句的判断有着比较大的开销。我们读取的数据量有至少70GB,绝大部分字符都不是逗号和换行符,上述写法会使得CPU耗费多时间在分支预测上。由于数据中分隔符总以先逗号再换行符的规律出现,我们可以把代码修改成:

long val = 0;
int i = 0;
while (i < size) {
    for (;;) {
        if (readBuffer[i++] == ',') {
            // successfully read a number belongs to cloumn 1
            val = 0;
            break;
        }
        val = val * 10 + (readBuffer[i] - '0');
    }

    for (;;) {
        if (readBuffer[i++] == '\n') {
            // successfully read a number belongs to cloumn 2
            val = 0;
            break;
        }
        val = val * 10 + (readBuffer[i] - '0');
    }
}

考虑到文件中的数字是在[0, 2^63]上均匀分布的,因此绝大部分数字的长度都是18或19。我们可以分两次把数字的前16个字节读入long中,这16个字节里大概率是没有分隔符的。然后对long进行暴力解析,如果这16个字节中含有分隔符(概率非常低)再使用传统方法做处理。这一方法进一步降低了判断和循环带来的开销:

long parse8bytes(long data) {
    return ((byte) data & 0x0F) * 10000000L +
        (((byte) data >> 8) & 0x0F) * 1000000L +
        (((byte) data >> 16) & 0x0F) * 100000L +
        (((byte) data >> 24) & 0x0F) * 10000L +
        (((byte) data >> 32) & 0x0F) * 1000L +
        (((byte) data >> 40) & 0x0F) * 100L +
        (((byte) data >> 48) & 0x0F) * 10L +
        (((byte) data >> 56) & 0x0F);
}

后来我们在网上发现了一种通过一系列位运算对8bytes数据进行parse的方法:Fast Integer Parse

该方法通过一系列位运算和尽可能少的乘法将连续的字符转为整数。

private static long parse8bytes(long chunk) {
        chunk = ((chunk & 0x0f000f000f000f00L) >> 8) 
            + (chunk & 0x000f000f000f000fL) * 10;
        chunk = ((chunk & 0x00ff000000ff0000L) >> 16) 
            + (chunk & 0x000000ff000000ffL) * 100;
        chunk = ((chunk & 0x0000ffff00000000L) >> 32) 
            + (chunk & 0x000000000000ffffL) * 10000;
        return chunk;
}

这一方法其实未必比上述的暴力方法更好。根据网友测试,它在一些处理器上有非常显著的效果(主要是AMD处理器),而在一些处理器上还不如上述方法。但在线上,这一方法还是比暴力方法略快一点,能带来约1.5s的提升。

分桶

为了把查询的数据缩小到一个尽可能小的范围,我们借助 Radix Sort 的思想对数据进行分桶,即根据数字的前若干位确定数据在哪个桶中。我们可以记录桶中数据个数,进而通过前缀和求出查询数据所在的桶。

这么做的话,桶数只能为2的N次方,有的人可能会认为这不够灵活,应当采用除法进行分桶。但根据前若干位分桶的话,只需要一个移位操作即可确定所在桶,开销远远小于除法。实际比赛中,我们用数字的前12位分桶,每列数据有桶2048个,4列数据共计有8192个桶。每个线程只读一个文件,也就是说只会处理两列数据,实际上要处理的桶数为4096。

// we have 2^(BUCKET_BITS-1) buckets
bucket = (int) (num >> (64 - BUCKET_BITS));

对于每个读线程,我们会释放一大段内存用于分桶。每个桶的大小为32KB,共计需要256MB的内存。由于数据是随机分布的,因此会造成很大的写跨度,进而引起比较严重的 cache miss 问题,另外对于 DRAM 来说,这种随机写的开销也比顺序写要大不少。为了缓解这一问题,我们设计了一个二级桶,每个桶大小为256bytes,每个线程有 4096 * 256b = 2MB 大小的二级桶,这些二级桶是可以常驻CPU Cache的。分桶时先把parse出的数据写到小桶中,某个小桶写满时再 memory copy 到大桶中。这一方法效果很好,能带来约5s的提升。

并发模型

从程序逻辑上来看,我们的并发模型是典型的 生产者 - 消费者 模型:若干线程作为生产者负责读和parse源文件,若干线程作为消费者负责将最终数据写入存储空间中。一般来说我们会采用队列来同步生产者和消费者的数据。一开始我们使用这种方法时,发现Java标准库中的队列,其进出的开销是很大的。我们进而考虑,是不是可以舍弃标准库中的队列,并且用尽可能少的同步操作来完成异步读写线程之间数据的交换?

调研过后,我们最终采用了双缓冲队列的并发模型。考虑现在有A, B两个线程:A线程负责读出源数据、parse并分桶,B线程负责将分桶后的数据落盘。为了能让A、B间交换数据并实现异步操作,我们释放两段能容纳4096个桶的缓冲区(内存),两个缓冲区分别对应着两个互斥锁LockA,LockB。生产者和消费者要控制那个缓冲区先要取得对应的锁。缓冲区会经历以下过程:

A 线程只要将缓冲区的任意一个桶写满,就立即释放对 LockA 的控制,等待控制 LockB 。一旦 B 线程将缓冲区数据全部下盘,释放 LockB,A立即控制 LockB,并开始刷写 LockB 对应的缓冲区数据。同时B线程锁住 LockA ,开始新一轮写操作。这样就完成队列交换。

实际比赛中,我们有8线程读出源数据、parse并分桶,对应也有8线程做数据落盘。采用双缓冲队列的缺点就是无法精细控制读写线程数量,可能造成一定程度的CPU资源浪费。

考虑到对于一个锁,最多只有两个线程去竞争,因此我们放弃使用标准库中的锁,而是基于CAS操作自己设计了一个 MyLock 用于双缓冲队列中。它通过一个 CAS 变量标记当前锁是否被占用,若被占用则挂起竞争线程,否则即可占有锁并改变标记。释放锁时,如果该锁上有被挂起的线程,则对其进行回复。这样,仅需极少的 CAS 操作即可完成线程间的同步。

public class MyLock {

    private Thread t;

    private AtomicBoolean flag;

    public MyLock() {
        flag = new AtomicBoolean(false);
        t = null;
    }

    public void lock() {
        while (!flag.compareAndSet(false, true)) {
            this.t = Thread.currentThread();
            UnsafeUtil.UNSAFE.park(false, 0L);
        }
    }

    public void unlock() {
        flag.set(false);
        if (this.t != null) {
            UnsafeUtil.UNSAFE.unpark(this.t);
            this.t = null;
        }
    }

}

查询

查询我们只是简单地采用了 Quick Select 算法。这是一种由 Quick Sort 演变而来,用于求 top k 或 rank k 问题的算法。由于 pmem 也具有顺序读速度远大于随机读速度读特点,因此在落盘时我们需要把属于同一个分区读数据刷写到连续的存储空间里,这样能大大减小读数据的开销。

本次比赛我们在查询上做的并不好,并没有完全利用到机器的性能。测评程序默认开8个线程查询,仅仅使用默认配置还远远不能打满cpu负载和pmem的读速度,应考虑并发地处理同一个测评线程中地查询。

总结

本次大赛中,由于对 pmem 特性不熟悉,并且也不是很了解 Java nio 库相关的内容,导致我们把很大一部分时间花在了研究如何读写文件和解析数据上,最后找到的方案也还行吧。遗憾的是我们的并发模型设计得不怎么样,不能做到把一些能够异步执行的逻辑真正做到异步执行,也没能比较好地控制线程数量,对 CPU 的利用率不高。并且在查询上也没有把 CPU 负载和 pmem 的读速度打到极致。

标签:

更新时间: