浅谈一下关系型数据库中json类型字段的处理

作者 : admin 本文共8491个字,预计阅读时间需要22分钟 发布时间: 2024-06-10 共2人阅读

文章目录

  • 背景
    • mysql json type
    • postgresql jsonb type
    • 场景说明
    • mysql实验案例
    • postgresql 实验案例
  • 总结

背景

最近涉及到在关系型数据库中解析json数据的问题,数据库表中有一个json类型的字段,业务场景涉及到展开单行json数据为*多行数据并和其他的表进行join查询. 以往只是知道当前主流的关系型数据库版本是支持存放json数据的,但并未深入研究过这些数据库支持json数据处理到什么样的程度.本文在两种关系行数据库(mysql以及postgresql)上进行实验然后进行总结,因为两者针对json字段处理提供的函数有较大出入,需要分开讨论.

mysql json type

参考mysql json datatype 以及 mysql json functions
其中 json_table函数是此次实验需要使用的函数 json table
这个函数本质上就是将稍显复杂的单行json 列展开成多行多列的table, 方便进行表级别的操作.
举个例子,将 [{"a":1, "b":2}, {"a":2, "b":3}] json array 变成a,b两列的表

select * from json_table('[{"a":1, "b":2}, {"a":2, "b":3}]', '$[*]' columns(
	a int path '$.a',
	b int path '$.b'
)) as t
a|b|
-+-+
1|2|
2|3|

如果待拆分的json数据来自某个表的json字段, 比如 test_json 表有一个test字段是json类型,里面存放如下的json数据

test              |
------------------+
[{"a": 1, "b": 2}]|
[{"a": 2, "b": 1}]|

现在把它拆分成a,b两列的新表

select a, b from 
test_json,
json_table(test_json.test, '$[*]' columns(
	a int path '$.a',
	b int path '$.b'
)) as t

结果

a|b|
-+-+
1|2|
2|1|

json_table还支持嵌套json解析,这个在当json字段存储的数据结构相对比较复杂的时候,发挥重要作用.比如往test_json表的test字段存入如下json数据

test                                                              |
------------------------------------------------------------------+
{"id": 1, "records": [{"id": 1, "data": 1}, {"id": 2, "data": 2}]}|
{"id": 2, "records": [{"id": 3, "data": 3}, {"id": 4, "data": 4}]}|

现在将其变成一个id, record_id 以及record_data 一行的结构, 使用如下sql

select t.id, record_id, record_data
from test_json,
json_table(test_json.test, '$' columns(
	id int path '$.id',
	nested path '$.records[*]' columns(
		record_id int path '$.id',
		record_data int path '$.data'
	)
)) as t

结果为

id|record_id|record_data|
--+---------+-----------+
 1|        1|          1|
 1|        2|          2|
 2|        3|          3|
 2|        4|          4|

postgresql jsonb type

pg对json数据存储提供了两种类型json以及jsonb
相比于mysql, postgresql提供的操作json类型数据的函数更多.
json type
json functions
官方推荐使用jsonb类型来存储json数据.
在诸多jsonb的方法中, jsonb_path_query可以根据jsonpath解析json数据并展开数据.它是本次实验需要使用的函数.

举个例子有如下测试json数据

select jsonb_path_query('{"content":[{"user_id":1, "score":0.7}, {"user_id":2, "score":0.8}]}', '$.content[*].user_id') as user_id,
jsonb_path_query('{"content":[{"user_id":1, "score":0.7}, {"user_id":2, "score":0.8}]}', '$.content[*].score') as score

展开成多行就可以得到如下的user_id和score两列的结果.

user_id|score|
-------+-----+
1      |0.7  |
2      |0.8  |

场景说明

假设使用关系型数据库存一个书籍推荐场景的数据, 定义如下三张表:
第一张推荐任务表 recommend_job

field_nametypedescription
idint一次推荐任务的id
resultjson当前推荐任务的结果
statusvarchar当前任务的状态:成功/失败/运行中

json 字段会存储基于物品的推荐结果, 样本数据如下:

{
	"recommend": [
			{
				"book_id": 1,
				"users": [
					{
						"user_id": 1,
						"score": 0.8
					},
					{
						"user_id": 2,
						"score": 0.7
					}
				]
			},
			{
				"book_id": 2,
				"users": [
					{
						"user_id": 3,
						"score": 0.6
					},
					{
						"user_id": 4,
						"score": 0.8
					}
				]
			},
			{
				"book_id": 3,
				// empty recommendation
				"users": []
			}
	],
	"meta": {
		"count": 100,
		"coverage": 0.7
	}
}

mysql ddl如下:

create table if not exists `recommend_job`(
	`id` bigint unsigned not null auto_increment comment 'id',
	`result` json comment 'recommend result',
	`status` varchar(64) not null comment 'recommend job status',
	`create_time` timestamp not null default current_timestamp comment 'create time',
  `update_time` timestamp null default null on update current_timestamp comment 'update time',
  primary key(`id`)
)engine=innodb default charset=utf8mb4 comment 'recommend job table';

pg ddl 如下

create table if not exists "recommend_job" (
	"id" serial not null,
	"result" jsonb,
	"status" varchar(64) not null,
	"create_time" timestamp not null default current_timestamp,
  "update_time" timestamp null,
	primary key("id")
)

第二张用户表user

field_nametypedescription
idint用户id
namevarchar用户名字
ageint用户年龄

mysql ddl如下:

create table if not exists `user`(
	`id` bigint unsigned not null auto_increment comment 'id',
	`name` varchar(255) not null comment 'username',
	`age` int unsigned not null comment 'user age',
	`create_time` timestamp not null default current_timestamp comment 'create time',
  `update_time` timestamp null default null on update current_timestamp comment 'update time',
  primary key(`id`)
)engine=innodb default charset=utf8mb4 comment 'user table';

postgresql ddl如下

create table if not exists "user" (
	"id" serial not null,
	"name" varchar(255) not null,
	"age" int not null,
	"create_time" timestamp not null default current_timestamp,
  "update_time" timestamp null default null,
  primary key("id")
)

第三张书籍表book

field_nametypedescription
idint书籍id
namevarchar书籍名字
authorvarchar书籍作者

mysql ddl如下:

create table if not exists `book`(
	`id` bigint unsigned not null auto_increment comment 'id',
	`name` varchar(255) not null comment 'book name',
	`author` varchar(255) not null comment 'book author',
	`create_time` timestamp not null default current_timestamp comment 'create time',
  `update_time` timestamp null default null on update current_timestamp comment 'update time',
  primary key(`id`)
)engine=innodb default charset=utf8mb4 comment 'book table';

postgresql ddl 如下:

create table if not exists "book" (
	"id" serial not null,
	"name" varchar(255) not null,
	"author" varchar(255) not null,
	"create_time" timestamp not null default current_timestamp,
  "update_time" timestamp null default null,
  primary key("id")
)

sample data数据注入db,生成脚本如下

# -*- coding:utf8 -*-
import pandas as pd
from sqlalchemy import create_engine
from pathlib import Path
import random, logging, os, math, json
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
random.seed(10)
def get_score():
return round(random.random(), 3)
def gen_recs(users, books, rec_count, ratio, job_id=1):
recommend = []
sp_books = random.sample(books, k = math.floor(len(books)*ratio))
for item in sp_books:
sp_users = random.sample(users, k=min(len(users), rec_count))
rec_users = [{"user_id": user[0], "score": get_score()} for user in sp_users]
recommend.append({
"book_id": item[0],
"users": rec_users
})
result = json.dumps({
"recommend": recommend,
"meta": {
"count": len(sp_books),
"coverage": ratio
}
})
return pd.DataFrame({
"id": [job_id],
"status": ["succeed"],
"result": [result]
})
if __name__ == '__main__':
users_file_path = Path('./users.txt')
books_file_path = Path('./books.txt')
users_df = pd.read_csv(users_file_path)
books_df = pd.read_csv(books_file_path)
res = gen_recs(users_df.values.tolist(), books_df.values.tolist(), 3, 0.8, 1)
db_user = os.getenv("DB_USER")
db_pass = os.getenv("DB_PASS")
db_host = os.getenv("DB_HOST")
db_port = os.getenv("DB_PORT")
db_name = os.getenv("DB_NAME")
db_url = f"mysql+pymysql://{db_user}:{db_pass}@{db_host}:{db_port}/{db_name}"
engine = create_engine(db_url)
# users_df.to_sql("user", engine, if_exists='replace', index=False)
# books_df.to_sql("book", engine, if_exists='replace', index=False)
#res.to_sql("recommend_job", engine, if_exists='replace', index=False)
# save res to db
# res.to_sql("recommend_job", engine, if_exists="append", index=False)

users.txt

id,name,age
1,A,10
2,B,11
3,C,12
4,D,9
5,E,10
6,F,13
7,G,12
8,H,10
9,I,11
10,J,12

books.txt

id,name,author
1,红楼梦,曹雪芹
2,活着,余华
3,1984,乔治.奥威尔
4,哈利.波特,J.K.罗琳
5,三体,刘慈欣
6,百年孤独,加西亚.马尔克斯
7,飘,玛格丽特.米切尔
8,动物农场,乔治.奥威尔
9,房思琪的初恋乐园,林亦含
10,三国演义,罗贯中
11,福尔摩斯侦探全集,阿.柯南道尔
12,白夜行,东野圭吾
13,小王子,圣埃克苏佩里
14,安徒生童话故事集,安徒生
15,天龙八部,金庸

mysql实验案例

输出每一本书对应的推荐用户列表,book_id对应所有的推荐用户list

select book_id, users from
recommend_job,
json_table(recommend_job.result, '$.recommend[*]' columns(book_id int path '$.book_id', users json path '$.users')) as t;

结果

book_id|users                                                                                          
-------+-----------------------------------------------------------------------------------------------
10|[{"score": 0.328, "user_id": 1}, {"score": 0.25, "user_id": 9}, {"score": 0.953, "user_id": 8}]
1|[{"score": 0.86, "user_id": 6}, {"score": 0.603, "user_id": 1}, {"score": 0.382, "user_id": 7}]
7|[{"score": 0.175, "user_id": 5}, {"score": 0.303, "user_id": 10}, {"score": 0.363, "user_id": 8
8|[{"score": 0.613, "user_id": 8}, {"score": 0.044, "user_id": 4}, {"score": 0.004, "user_id": 10
15|[{"score": 0.536, "user_id": 3}, {"score": 0.772, "user_id": 4}, {"score": 0.24, "user_id": 5}]
......

这里我们还可以将嵌套的users展开和user表进行join得到最终的每一个用户推荐书籍和相应的score

with rec_tbl as (
select t.book_id, t.user_id, t.score from
recommend_job,
json_table(recommend_job.result, '$.recommend[*]' columns(
book_id int path '$.book_id',
nested path '$.users[*]' columns(
user_id int path '$.user_id',
score float path '$.score'
)
)
) t)
select 
user.id user_id,
book_id,
score,
user.name user_name,
user.age user_age
from 
user left join
rec_tbl
on user.id = rec_tbl.user_id

结果

user_id|book_id|score|user_name|user_age|
-------+-------+-----+---------+--------+
1|     10|0.328|A        |      10|
1|      1|0.603|A        |      10|
2|     12|0.567|B        |      11|
2|     13|0.442|B        |      11|
3|     15|0.536|C        |      12|
3|      4|0.238|C        |      12|
3|     13|0.773|C        |      12|
......

postgresql 实验案例

postgresql同样的业务场景下查询语句相比于mysql就更加简单

with rec_tbl as (
select 
id,
book_id,
cast (jsonb_path_query(users, '$[*].user_id') as int) user_id,
jsonb_path_query(users, '$[*].score') score
from
(select 
id,
jsonb_path_query(result, '$.recommend[*].book_id') book_id,
jsonb_path_query(result, '$.recommend[*].users') users
from
recommend_job) t
)
select 
user_id,
book_id,
score,
name user_name,
age user_age
from 
public.user left join
rec_tbl
on public.user.id = rec_tbl.user_id
order by user_id asc;

结果

user_id|book_id|score|user_name|user_age|
-------+-------+-----+---------+--------+
1|1      |0.603|A        |      10|
1|10     |0.328|A        |      10|
2|13     |0.442|B        |      11|
2|12     |0.567|B        |      11|
3|2      |0.773|C        |      12|
3|13     |0.773|C        |      12|
3|3      |0.865|C        |      12|
......

总结

mysql以及postgresql针对json类型字段提供的处理方法会有差异,因此使用的时候需要注意这个点.其次实际开发过程中我们也可以不用使用如下这个表结构存储推荐job的结果

file_nametype
idint
statusvarchar
resultjson

我们完全可以在定义推荐结果表的时候将book_id, user_id和score 也定义为字段,这样也不必做比较复杂的json字段解析.

field_nametype
idint
statusvarchar
book_idint
user_idint
scorefloat

当然这个看具体项目需求,不过即便是需要对复杂的json数据进行解析,看完本文也不存在障碍了.

本站无任何商业行为
个人在线分享 » 浅谈一下关系型数据库中json类型字段的处理
E-->