Neo4j系列博客-高级应用

引入

在之前的学习中,大致明白了 Neo4j 的基本操作、程序开发、数据库管理等操作,对于一些基本的操作已经足够,而现在则是通过一些高级的使用来探索其他功能的启发。


高级索引

主要介绍空间索引和中文全文索引两个部分。

空间索引

空间索引的程序库是 Neo4j Spatial,他的主要作用是对数据进行空间操作,开发人员可以向已经具有位置信息的数据添加空间索引,并且对数据进行空间计算操作。另外 Neo4j Spatial 提供了数据导入 GeoToolsJava 编写的开源 GIS 工具包),从而启用 GeoTools 的应用程序,比如 GeoServer (共享空间地理数据的开源服务器)和 uDig (开源地理桌面数据访问、编辑、呈现框架)。

Neo4j Spatial 的主要功能包括:

  • 可以将 RESI Shapefile (SHP)Open Street Map (OSM) 文件导入 Neo4j 的应用程序。
  • 支持常见的几何图形:点、线、多边形等。
  • 用于快速搜索几何形状的 RTree 结构。
  • 支持搜索期间的拓扑操作(包含、属于、相交、覆盖等)。
  • 只要提供从图形映射到几何形状的适配器,就可以对任何图形进行空间操作,而不负责其数据的存储方式。
  • 能够使用预配置的过滤器将单个图层或数据拆分成多个之图层或视图。
基本使用

使用 Neo4j Spatial 最简单的方法就是获取 neo4j-spatial-*.**-neo4j-*.*.*-server-plugin,然后将其复制到 plugin 目录下,重新启动 Neo4j 服务即可使用。
使用 Cypher 查询一样调用 Neo4j 空间过程,为节点添加空间索引,并且执行多个空间点的距离、交叉查询等操作。

1
2
3
4
5
6
7
8
9
// 创建 geom 的点图层
CALL spatial.addPointLayer('geom')
CALL spatial.layers()
// 建立经度为 15.2 ,维度为 60.1 的空间点
CREATE (n:Node {latitude: 60.1, longitude: 15.2}) WITH n
// 将创建的点加入到 geom 的点图层中
CALL spatial.addNode('geom', n) YIELD node RETURN node
// 查询精度在 60.0 到 60.1 之间,纬度在 15.0 到 15.3 之间的空间点
CALL spatial.bbox('geom', {lon: 15.0, lat: 60.0}. {lon: 15.3, lat: 60.1})
原理

Neo4j 空间索引是 RTree 索引,它是以扩展的方式开发的,允许在必要的时候添加其他索引。空间索引可以在数据生成的过程中添加,也可以为现有的空间数据添加索引,并且实际上可以导致不同的图结构。
需要在数据生产的过程中添加索引,最简单的方法就是创建合适的数据图层,最常用的两种:

  • SimplePointLayer:一个可编辑的数据图层,仅允许向数据库添加点数据。
  • Editablelayer(Impl):默认的可编辑图层实现,可以处理任何简单的集合类型。存储格式为 WKB,专门用于几何地理的二进制格式。

定义几何图形的集合叫图层,其中包含可用于查询的索引,如果在图层中添加和修改图形,则这个图层就是可编辑图层。

空间索引查询类型

RTree 索引可以实现的查询方式有:

  • 包含(Contian)。
  • 覆盖(Cover)。
  • 被覆盖(Covered By)。
  • 交叉(Cross)。
  • 不相交(Disjoint)。
  • 相交(Intersect)。
  • 交叠(Oberlap)。
  • 接触(Touch)。
  • 包含(Within)。
  • 在一定距离内(With Distance)。
Java 构建 Neo4j 空间索引

Neo4j 自带一个用于导入 ESRI Shapefile 数据的程序,ShapeFileImporter 将为每个导入的 Shapefile 创建一个新的图层,并将每个几何图形作为 WKB 存储在单个节点的单个属性中。

  1. 导入 shape 文件
    下面的代码将 roads.shp 导入到 layer_roads 图层中。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    File storeDir = new File(dbPath);
    GraphDatabaseService database = new GraphDatabaseFactory().newEmbeddedDatabase(storeDir);
    try (Transaction tx = database.beginTx()) {
    ShapefileImporter importer = new ShapefileImporter(database);
    importer.importFile("roads.shp", "layer_roads");
    tx.success();
    } finally {
    database.shutdown();
    }

    使用 Neo4j Spatial 过程调用,可以达到同样的效果

    1
    2
    CALL spatial.addWKTLayer('layer_roads', 'geometry')
    CALL spatial.importShapefileToLayer('layer_roads', 'roads.shp')
  2. 导入开放街道地图文件(OSM
    导入 OSM 文件要比 SHP 文件复杂很多,因为导入 OSM 文件需要分两个过程进行,而第一个过程需要批处理导入。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    File dir = new File(dbPath);
    // 设置图层名称
    OSMImporter importer = new OSMImporter("OSM");
    // 批量导入配置
    Map<String, String> config = new HashMap<String, String>();
    config.put("neostore.nodestore.db.mapped_memory", "90M");
    config.put("dump_configuration", "true");
    config.put("use_memory_mapped_buffers", "true");
    BatchInserter batchInserter = new BatchInserterImpl(dir, config);
    importer.importFile(batchInserter, "map.osm", false);
    BatchInserter.shutdown();
    GraphDatabaseService db = new GraphDatabaseFactory().newEmbeddedDatabase(dir);
    importer.reIndex(db, 10000);
    db.shutdown();
常见的 Cypher 查询
  1. WithinDistance 查询
    WithinDistance 查询是空间查询中最常用的一种方式,使用的是球面距离,计算采用 OrthodromicDistance 算法。

    1
    start n = node:geom('WithinDistance:[21.331937, 120.638154, 0.1]') return n limit 10
  2. WithinWKTGeometry 查询
    查询由点 (120.678966, 31.300864) 与点 (120.978966, 31.330864) 构成的 Ploygon 多边形范围内的点

    1
    start n = node.geoindex('WithinWKTGeometry:PLOYGON ((120.678966, 31.300864, 120.978966, 31.330864, 120.978966, 31.300864, 120.678966, 31.300868))') return n limit 10
  3. bbox 矩形查询
    查询由点 (120.678966, 31.300864) 与点 (120.978966, 31.330864) 构成的 BBox 矩形范围内的点

    1
    start n = node.geom('bbox:[120.678966, 120.978966, 31.300864, 31.330864]') return n limit 10

中文全文索引

Neo4j 也提供了全文索引机制,并且是基于 Lucene 实现的,但是在默认情况下 Lucene 只提供了基于英文的分词器,如果将之用于中文,则会将中文分割成单个的字,这样就破坏了中文的语义结构,而且针对特定领域的中文分词,可能还需要自定义词典。

IKAnalyzer 分词器

IKAnalyzer 是一个开源的、基于 Java 开发的轻量级中文分词工具包,但是目前官方仅支持 Luence 3.0 版本。

基本使用
  1. 自定义词典
    词典文件:自定义词典文件后缀名为 .dic 的词典文件,必须使用 UTF-8 编码保存。
    词典配置: IKAnalyzer.cfg.xml 必须在 src 目录下。词典文件可以放置在任意位置,但是在配置文件中必须配置正确。

    1
    2
    3
    4
    5
    6
    7
    <?xml version="1.0" encoding="UTF-8" ?>
    <!DOCUTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd" >
    <properties>
    <commnet>IK Analyzer 扩展配置</commnet>
    <entry key="ext_dict">ext.dic;</entry>
    <entry key="ext_stopwords">stopwords.dic;</entry>
    </properties>
  2. 嵌入式模式下的中文全文索引
    指定 IKAnalyzer 作为 Luence 分词的 Analyzer,并对一个 Label 下的所有 Node 指定属性新建全文索引。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    try (Transaction tx = database.beginTx()) {
    Map<String, String> config = new stringMap(IndexManager.PROVIDER, "luence", "type", "fulltext", "analyzer", IKAnalyzer.class.getName());
    IndexManager index= database.index();
    Index<Node> newFullTextIndex = index.forNodes("FullTextIndex", config);
    ResourceIterator<Node> nodes = database.findNodes(DynamicLabel.label(LabelName));
    while (nodes.hasNext()) {
    Node node = nodes.next();
    Object text = node.getProperty("text", null);
    newFullTextIndex.add(node, "text", text);
    }
    tx.success();
    }

Docker 部署 Neo4j

概览

默认情况下 Neo4j 的镜像开放了三个用于远程访问的端口:

  • 7474 用于 HTTP 协议。
  • 7473 用于 HTTPS 协议。
  • 7687 用于 BOLT 协议。

此外还指定了两个位置用于存放数据文件和日志文件:

  • /data 用于数据库在容器的外部持久化。
  • /logs 用于允许访问 Neo4j 的日志文件。

Docker 中运行 Neo4j:

1
docker run --publish=7474:7474 --publish=7687:7687 --volume=$HOME/neo4j/data:/data --volume=$HOME/neo4j/logs:/logs neo4j:3.1

可通过 -env NEO$J_AUTH=neo4j/<password> 指令设置容器的登录密码。

配置

有三种方式可以修改 Neo4j 容器的配置:

  • 设置环境变量。
  • 指定 /conf 存储位置。
  • 构建新的镜像。
  1. 环境变量

    • NEO4J_AUTH:控制身份验证。
    • NEO4J_dbms_memory_pagecache_size:本地内存缓存的大小。
    • NEO4J_dbms_memory_heap_maxSize:堆大小。
    • NEO4J_dbms_txLog_rotation_retentionPolicy:保留逻辑日志的大小。
    • NEO4J_dbms_allowFormatMigration:是否启用升级。
    • NEO4J_dbms_mode:数据库模式。
    • NEO4J_causalClustering_expectedCoreClusterSize:启动时的集群初始大小。
    • NEO4J_causalClustering_initialDiscoveryMembers:集群核心成员的初始网络地址/端口号。
    • NEO4J_causalClustering_discoveryAdvertisedAddress:成员发现地址/端口号。
    • NEO4J_causalClustering_transactionAdvertisedAddress:广播事务处理地址/端口号。
    • NEO4J_causalClustering_raftAdvertisedAddress:广播集群通信地址/端口号。
    • NEO4J_ha_serverId:服务器的唯一标识。
    • NEO4J_ha_host_corrdinationHA 模式下集群协调通信的地址。
    • NEO4J_ha_host_dataHA 模式下用于数据传输的地址。
    • NEO4J_ha_initialHosts:集群的其他成员的地址/端口列表。
  2. /conf 配置文件配置
    /conf 中的配置都会覆盖镜像中的配置文件,包括已经生效的、为容器提供环境变量。如果要修改文件中的一个值则必须保证其他部分都是正确的。

    1
    docker run --detach --publish=7474:7474 --publish=7687:7687 --volume=$HOME/neo4j/data:/data --volume=$HOME/neo4j/logs:/logs --volume=$HOME/neo4j/conf:/conf neo4j:3.1
  3. 因果集群
    每个容器必须有一个与其他容器通信的网路路由,核心服务器必须要设置 NEO4J_causalClustering_initialDiscoveryMembersNEO4J_causalClustering_scpectedCoreClusterSize,而只读副本则只需要设置 NEO4J_causalClustering_initialDiscoveryMembers

    1
    2
    3
    4
    5
    6
    7
    docker network create --driver=bridge cluster
    docker run --name=core1 --detach --network=cluster --publish=7474:7474 --publish=7687:7687 --env=NEO4J_dbms_mode=CORE --env=NEO4J_causalClustering_scpectedCoreClusterSize=3 \
    --env=NEO4J_causalClustering_initialDiscoveryMembers=core1:5000,core2:5000.core3:5000 neo4j:3.1-enterprise
    docker run --name=core2 --detach --network=cluster --env=NEO4J_dbms_mode=CORE --env=NEO4J_causalClustering_scpectedCoreClusterSize=3 \
    --env=NEO4J_causalClustering_initialDiscoveryMembers=core1:5000,core2:5000.core3:5000 neo4j:3.1-enterprise
    docker run --name=core3 --detach --network=cluster --env=NEO4J_dbms_mode=CORE --env=NEO4J_causalClustering_scpectedCoreClusterSize=3 \
    --env=NEO4J_causalClustering_initialDiscoveryMembers=core1:5000,core2:5000.core3:5000 neo4j:3.1-enterprise
  4. 高可用性集群
    运行高可用性集群则每个容器必须具有可以连接到其他容器的网络路由 NEO4J_ha_host_corrdinationNEO4J_ha_host_dataNEO4J_ha_initialHosts

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    docker network create --driver=bridge cluster
    docker run --name=instance1 --detach --network=cluster --hostname=instance1 --publish=7474:7474 --publish=7687:7687 --volume=$HOME/neo4j/logs:/logs --env=NEO4J_dbms_mode=CORE \
    --env=NEO4J_ha_host_corrdination=HA --env=NEO4J_ha_serverId=1 --env=NEO4J_ha_host_corrdination=insatnce1:5001 --env=NEO4J_ha_host_data=instance1:6001 \
    --env=NEO4J_ha_initialHosts=instance1:5001,instance2:5001,instance3:5001 neo4j:3.1-enterprise
    docker run --name=instance2 --detach --network=cluster --hostname=instance2 --publish=7474:7474 --publish=7687:7687 --volume=$HOME/neo4j/logs:/logs --env=NEO4J_dbms_mode=CORE \
    --env=NEO4J_ha_host_corrdination=HA --env=NEO4J_ha_serverId=2 --env=NEO4J_ha_host_corrdination=insatnce2:5001 --env=NEO4J_ha_host_data=instance2:6001 \
    --env=NEO4J_ha_initialHosts=instance1:5001,instance2:5001,instance3:5001 neo4j:3.1-enterprise
    docker run --name=instance3 --detach --network=cluster --hostname=instance3 --publish=7474:7474 --publish=7687:7687 --volume=$HOME/neo4j/logs:/logs --env=NEO4J_dbms_mode=CORE \
    --env=NEO4J_ha_host_corrdination=HA --env=NEO4J_ha_serverId=3 --env=NEO4J_ha_host_corrdination=insatnce3:5001 --env=NEO4J_ha_host_data=instance3:6001 \
    --env=NEO4J_ha_initialHosts=instance1:5001,instance2:5001,instance3:5001 neo4j:3.1-enterprise
  5. 使用 Cypher-Shell
    Cypher-Shell 可以在容器内使用以下命令在本地运行:

    1
    docker exec --interactive --tty <container> bin/cypher-shell

Neo4j 与图计算

Apache Spark 是一种集群内数据处理的解决方案,可以轻松地在多个机器上进行大规模数据处理,此外还有 GraphXGraphFrames 两个框架,可以专门用于对数据进行图计算操作。
Spark 可以与 Neo4j 搭配进行外部数据处理解决方案,处理过程如下:

  • 所需要分析的子图从 Neo4j 导出到 Spark
  • 利用 Spark 集群进行图计算。
  • 将计算结果返回到 Neo4j 中。
  • Cypher 语言或者其他操作工具进行查询。

Neo4j-Spark-Connector

Neo4j-Spark-Connector 使用二进制 Bolt 协议从 Neo4j 中导入和导出数据,同时提供了 Spark-2.0RDDDataFrameGraphX GraphGraphFrames 等,可以自由地选择如何使用 Spark 处理数据。
一般的分析过程如下:

  • 创建 org.neo4j.spark.Neo4j(sc)
  • 设置 cypher(query, [params], nodes(query, [params]), rels(query, [params]) 作为直接查询或者 pattern("Label", Seq("REL"), "Label2")pattern(("Label1", "prop1"), ("REL", "prop"), ("Label2", "prop2"))
  • 为并行计算定义 partitions(n), batch(size), rows(count)
  • 选择返回的数据类型。
    • loadRowRdd, loadNodeRdds, loadRelRdd, loadRdd[T]
    • loadDataFrame, loadDataFrame(schema)
    • loadGraph[VD, ED]
    • loadGraphFrame[VD, ED]

下面实现一个简单地从 Neo4j 中加载数据并且使用 Spark 分析的流程:

  1. Neo4j 中添加数据

    1
    2
    3
    4
    5
    6
    UNWIND range(1, 100) as id
    CREATE (p:Person {id: id}) WITH collect(p) as people
    UNWIND people as p1
    UNWIND range(1, 10) as friend
    WITH p1, people[(p1.id + friend) % size(people)] as p2
    CREATE (p1)->[:KNOWS {years: abs(p2.id - p1.id)}]->(p2)
  2. Spark 中配置 Neo4j 的连接方式

    1
    2
    3
    spark.neo4j.bolt.url = bolt://<host>:<port>
    spark.neo4j.bolt.user = <username>
    spark.neo4j.bolt.password = <password>
  3. 打开 Spark 添加 jar

    1
    spark-shell --packages neo4j-contrib:neo4j-spark-connector:2.0.0-M2

    如果还需要添加其他依赖的 jar 包,在后面依次添加即可,用逗号分隔。

  4. 加载数据转换为 SparkRDD 类型

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    import org.neo4j.spark._
    val neo = Neo4j(sc)
    val rdd = neo.cypher("MATCH (n:Person) RETURN id(n) as id").loadRowRdd
    rdd.count // 1000
    rdd.first.schema.fieldNames // ["id"]
    rdd.first.schema("id") // StructField(id, LongType, true)
    neo.cypher("MATCH (n:Person) WHERE n.id <= {maxId} RETURN n.id").param("maxId", 10).loadRowRdd.count // 10
    // 设置分区和批处理大小
    neo.nodes("MATCH (n:Person) RETURN id(n) SKIP {_skip} LIMIT {_limit}").partitions(4).batch(25).loadRowRdd.count // 100 == 4 * 25
    // 通过 pattern 加载数据
    neo.pattern("Person", Seq("KNOWS"), "Person").rows(80).batch(21).loadNodeRdds.count // 80
    // 通过 pattern 加载关系
    neo.pattern("Person", Seq("KNOWS"), "Person").partitions(12).batch(100).loadRelRdds.count // 1000
  5. 加载数据转换为 SparkDataFrame 类型

    1
    2
    3
    4
    import org.neo4j.spark._
    val neo = Neo4j(sc)
    neo.nodes("MATCH (n:Person) RETURN id(n) as id SKIP {_skip} LIMIT {_limit}").partitions(4).batch(25).loadDataFrame.count
    var df = neo.pattern("Person", Seq("KNOWS"), "Person").rows(12).batch(100).loadDataFrame // [id: gigint]
  6. 加载数据转换为 SparkGraphX Graph 类型

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    import org.neo4j.spark._
    import org.apache.spark.graphx._
    import org.apache.spark.graphx.lib._
    val neo = Neo4j(sc)
    val graphQuery = "MATCH (n:Person)-[:KNOWS]-(m:Person) RETURN id(n) as source, id(m) as target, type(r) as value SKIP {_skip} LIMIT {_limit}"
    val graph: Graph[Long, String] = neo.rels(graphQuery).partitions(7).batch(200).loadGraph
    graph.vertices.count // 100
    graph.edges.count // 1000
    // 通过 pattern 加载 graph
    val graph = neo.patttern(("Person", "id"), ("KNOWS", "since"), ("Person". "id")).partitions(7).batch(200).loadGraph[Long, Long]
    val graph2 = PageRank.run(graph, 5)
    graph2.vertices.sort(_._2).take(3)
  7. 加载数据转换为 SparkGraphFrames 类型

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    import org.neo4j.spark._
    import org.graphframes._
    val neo = Neo4j(sc)
    val graphFrame = neo.pattern(("Person", "id"), ("KNOWS", null), ("Person". "id")).partitions(3).rows(1000).loadGraphFrame
    graphFrame.vertices.count // 100
    graphFrame.edges.count // 1000
    val pageRankFrame = graphFrame.pageRank.maxIter(5).run()
    val ranked = pageRankFrame.vertices
    ranked.printSchema()
    val top3 = ranked.orderBy(ranked.col("pagerank").desc).take(3)

Neo4j 与自然语言处理

自然语言处理技术在挖掘文本数据时使用的关键技术之一是本体的挖掘词关联。词关联在语言处理标记、解析、实体提取等自然语言处理任务中非常有用,Neo4j 由于其关联数据的能力,为自然语言处理中词关联的处理提供了一种新的解决方案。
在自然语言中最常见的词关联就是聚合关系和组合关系。

计算聚合相关性

计算聚合关系的基本步骤分为三步:

  • 通过一个单词的上下文表示每个单词。
    考虑一个简单的文档,其中包含这样的单词:
    1
    2
    My cat eats fish on Saturday.
    His dog eats turkey on Tuesday.
    至此要分析两个词之间是否存在关联,那就需要使用一定的方法来表示给定单词的上下文。
    1
    2
    Right1("cat") = {"eats", "ate", "is", "has", ...}
    Left1("cat") = {"my", "his", "big", "a", "the", ...}
  • 计算上下文相关性
    为了计算聚合关系的度量,可以采用相对上下文相似性的和来表示,使用 Jaccard 指数作为相似性的度量。
    1
    Sim("Cat", "Dog") = Sim(Left1("Cat"), Left1("Dog")) + Sim(Right1("Cat"), Right1("Dog"))
  • 具有上下文高度相似性的词可能具有聚合关系
    一旦有了这种计算相似度的方式,就可以寻找具有高相似度的词对。其中可以通过扩大上下文窗口的大小,处理停止词来调整相似性得分。

将文本数据建模为邻接图

可以将文本数据建模为邻接图,其中每个单词是一个节点,两个节点之间的边表示这些单词在文本语料库中彼此相邻。
neo4j-9-1.jpg

挖掘单词之间的关系

根据插入的数据来发现单词之间的关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
LEFT1_QUERY = 'MATCH (s:Word {word: {word}}) MATCH (w:Word)-[:NEXT_WORD]->(s) RETURN w.word as word'
RIGHT1_QUERY = 'MATCH (s:Word {word: {word}}) MATCH (w:Word)<-[:NEXT_WORD]-(s) RETURN w.word as word'

def left1(word):
params = {
'word': word.lower()
}
tx = graphdb.cypher.begin()
tx.append(LEFT1_QUERY, params)
results = tx.commit()
words = []
for result in results:
for line in result:
words.append(line.word)
return set(words)

def right1(word):
params = {
'word': word.lower()
}
tx = graphdb.cypher.begin()
tx.append(RIGHT1_QUERY, params)
results = tx.commit()
words = []
for result in results:
for line in result:
words.append(line.word)
return set(words)

def jaccard(a, b):
intSize = len(a.intersection(b))
unionSize = len(a.union(b))
return intSize / unionSize

def paradingSimilarity(w1, w2):
return (jeccard(left1(w1), left1(w2)) + jaccard(right1(w1), right1(w2))) / 2.0

通过上面的代码我们就可以计算单词之间的相似性:

1
paradingSimilarity("school", "university") // 0.2153846153846154

引用


个人备注

此博客内容均为作者学习所做笔记,侵删!
若转作其他用途,请注明来源!