博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
S3上备份的json文件转存成parquet文件
阅读量:5976 次
发布时间:2019-06-20

本文共 2509 字,大约阅读时间需要 8 分钟。

背景:

     大量falcon 监控数据打到kinesis,然后把kinesis内的数据以json格式实时备份到s3上(临时备份),为了降低成本,减少S3空间占用以及后期数据分析,计划把s3上的json文件转储成parquet文件。

使用服务:glue(脚本作业)+lambda(定时触发glue脚本)+cloudwatch events(定时器)

glue脚本:

import sysfrom awsglue.transforms import *from pyspark.context import SparkContextfrom awsglue.context import GlueContextfrom awsglue.utils import getResolvedOptionsfrom pyspark.sql.types import TimestampType,DateTypefrom awsglue.job import Jobimport boto3import datetimeimport timeimport logginglogging.basicConfig()logger = logging.getLogger()logger.setLevel(logging.INFO)## @params: [JOB_NAME]args = getResolvedOptions(sys.argv, ['JOB_NAME'])sc = SparkContext()glueContext = GlueContext(sc)spark = glueContext.spark_sessionjob = Job(glueContext)job.init(args['JOB_NAME'], args)start_hour_before = 1end_hour_before = 1source_bucket_name = "backuptemp"target_bucket_name = "kinesis"target_prefix_name = "parquet_alertlog"delimiter = "/"default_region = "us-west-2"crawler_name = "clean_alertlog"client = boto3.client('s3')def delete_object(bucket_name,key_name):    try:        response = client.delete_object(Bucket=bucket_name,Key=key_name)    except Exception as e:        print str(e)        #email_alert("error when delete_object %s/%s" % (bucket_name, key_name))        def aggragate_files(date,key_name):    logger.info("start aggragate %s, time is %s." % (key_name, time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))))    if key_name == "":        return    try:        dataframe = spark.read.json("s3://%s/%s" % (source_bucket_name,key_name))        print("dataframe.....................",dataframe)        dataframe.write.parquet("s3://%s/%s/dt=%s" % (target_bucket_name, target_prefix_name, date), mode="append")        logger.info("finish aggragate %s, time is %s." % (key_name, time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))))    except Exception as e:        #email_alert("error when aggragate %s/%s/%s: %s." % (key_name, date, hour, str(e)))        print str(e)    else:        delete_object(source_bucket_name,key_name)        def main():    s3 = boto3.resource('s3')    process_slot = datetime.datetime.now() - datetime.timedelta(days=start_hour_before)    bucket = s3.Bucket(source_bucket_name)    dt = process_slot.strftime("%Y-%m-%d")    for obj in bucket.objects.all():        aggragate_files(dt,obj.key)        main()####commit jobjob.commit()

注释:

1、循环s3桶中对象文件,迭代转储为parquet文件,转储成功后删除原json文件

2、保存parquet文件时,采用mode="append"模式。

 

转载于:https://www.cnblogs.com/husbandmen/p/9475616.html

你可能感兴趣的文章
solr的suggest模块
查看>>
2PHP页面缓存
查看>>
菜鸟学Linux命令:bg fg jobs命令 任务管理
查看>>
【Linux系统编程】 Linux系统调用概述
查看>>
SQL Server Reporting Services:无法检索应用程序文件。部署中的文件已损坏
查看>>
hive中partition如何使用
查看>>
查看mysql数据库版本方法总结
查看>>
大牛手把手教你做日历(建议你看看,你会有收获的)
查看>>
Django中的ORM
查看>>
iOS开发UI篇—Quartz2D使用(图片剪切)
查看>>
spring学习笔记(20)数据库事务并发与锁详解
查看>>
关于Simple_html_dom的小应用
查看>>
鲁肃:蚂蚁金服的三个梦想
查看>>
华为程序员:加6天班!加班费1.4万元!网友:我能加到它破产
查看>>
Linux入门基础之grep命令详解及正则表达式
查看>>
Linux之Find命令详解
查看>>
crysis2 video&cryengine3 editor show
查看>>
Hibernate学习之SessionFactory的opensession 和 getCu...
查看>>
web网站服务(二)
查看>>
【第一期】网站打开错误问题解决方法集合
查看>>