原有的事务支持使用MemcachedState来进行,现在需要将其迁移至Redis,并且需要记录所有key值列表,因为在redis中虽然可以使用keys *操作,但不是被推荐的方式,所以把所有结果存在Redis中的一个HASH格式字段中。
关于Redis与Storm集成的相关文档,可以参考:
由于Redis中也有着较多种类型的数据结构,这也为我们提供了可能,将所有的key至统一放置到set中,或其他更为合适的数据结构中。
搭建启动Redis
目前,分配过来的4台服务器,只有135剩余内存较多,分出1G用来作为Redis存储使用,搭建一台单机Redis服务,用于记录所有的查询日志。
启动该服务:
sudo bin/redis-server conf/redis.6388.conf
Storm集成Redis
添加maven依赖:
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-redis</artifactId> <version>${storm.version}</version> </dependency>
对于正常的Bolt来说,storm-redis提供了基本的bolt实现,RedisLookupBolt和RedisStoreBolt,
其中使用了策略模式,将实际要查询/保存相关的key设置以及策略放到了RedisLookup/StoreMapper中,在LookupBolt和StoreBolt中进行实际的查找、保存操作,根据RedisDataType的不同,支持Redis的各种数据类型:STRING, HASH, LIST, SET, SORTED_SET, HYPER_LOG_LOG。
从对应传输过来的Tuple中查找、保存相应字段的值,在RedisLookupBolt中,根据不同的key值,从key值/或者additionalKey中使用不同的方法来get得到对应的值。
@Override public void execute(Tuple input) { String key = lookupMapper.getKeyFromTuple(input); Object lookupValue; JedisCommands jedisCommand = null; try { jedisCommand = getInstance(); switch (dataType) { case STRING: lookupValue = jedisCommand.get(key); break; case LIST: lookupValue = jedisCommand.lpop(key); break; case HASH: lookupValue = jedisCommand.hget(additionalKey, key); break; case SET: lookupValue = jedisCommand.scard(key); break; case SORTED_SET: lookupValue = jedisCommand.zscore(additionalKey, key); break; case HYPER_LOG_LOG: lookupValue = jedisCommand.pfcount(key); break; default: throw new IllegalArgumentException("Cannot process such data type: " + dataType); } List<Values> values = lookupMapper.toTuple(input, lookupValue); for (Values value : values) { collector.emit(input, value); } collector.ack(input); } catch (Exception e) { this.collector.reportError(e); this.collector.fail(input); } finally { returnInstance(jedisCommand); }
Redis TridentState支持
此外,storm-redis中还支持trident state:
RedisState and RedisMapState, which provide Jedis interface just for single redis. RedisClusterState and RedisClusterMapState, which provide JedisCluster interface, just for redis cluster.
由于我们使用的是single redis模式(非集群),在下面的UML图中会有所体现:
使用RedisDataTypeDescription来定义保存到Redis的数据类型和额外的key,其中支持两种数据类型:STRING和HASH。如果使用HASH类型,则需要定义额外的key,因为hash属于两层的,我们定义的additionalKey为最外层的key类型。
例如我们需要保存结果至Redis的Hash数据结构中,则需要定义RedisDataTypeDescription.RedisDataType.HASH,定义hash的key:"controller:5min”,根据key进行group by操作,当前使用非事务型(对数据正确性敏感度不高)。
Options<Object> fiveMinitesOptions = new Options<>(); fiveMinitesOptions.dataTypeDescription = new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, "controller:5min"); logStream.each(new Fields("logObject"), new Log5MinGroupFunction(), new Fields("key")) .groupBy(new Fields("key")) .persistentAggregate(RedisMapState.nonTransactional(poolConfig, fiveMinitesOptions), new Fields("logObject"), new LogCombinerAggregator(), new Fields("statistic"));
最后在Redis中保存的值为:
controller:5min Log5MinGroupFunction生成的key,LogCombinerAggregator合并完成后的value;
Log5MinGroupFunction生成的key会经过KeyFactory.build(List<Object> key)方法转换,可以考虑自定义生成的key;最终的value会通过Serializer的序列化以及反序列化方法转换成byte[]存放至Redis中,默认是通过JSON的格式。
在AbstractRedisMapState中,对于传过来的keys进行统一KeyFactory.get操作,而实际获取值和持久化值是通过 retrieveValuesFromRedis以及updateStatesToRedis两个方法来实现的
@Override public List<T> multiGet(List<List<Object>> keys) { if (keys.size() == 0) { return Collections.emptyList(); } List<String> stringKeys = buildKeys(keys); List<String> values = retrieveValuesFromRedis(stringKeys); return deserializeValues(keys, values); } private List<String> buildKeys(List<List<Object>> keys) { List<String> stringKeys = new ArrayList<String>(); for (List<Object> key : keys) { stringKeys.add(getKeyFactory().build(key)); } return stringKeys; } @Override public void multiPut(List<List<Object>> keys, List<T> vals) { if (keys.size() == 0) { return; } Map<String, String> keyValues = new HashMap<String, String>(); for (int i = 0; i < keys.size(); i++) { String val = new String(getSerializer().serialize(vals.get(i))); String redisKey = getKeyFactory().build(keys.get(i)); keyValues.put(redisKey, val); } updateStatesToRedis(keyValues); }
在RedisMapState中,从Redis中获取值的方法:
@Override protected List<String> retrieveValuesFromRedis(List<String> keys) { String[] stringKeys = keys.toArray(new String[keys.size()]); Jedis jedis = null; try { jedis = jedisPool.getResource(); RedisDataTypeDescription description = this.options.dataTypeDescription; switch (description.getDataType()) { case STRING: return jedis.mget(stringKeys); case HASH: return jedis.hmget(description.getAdditionalKey(), stringKeys);
可以看出,支持两种类型STRING以及HASH,可以通过批量获取的API获取多个keys值,update的过程也比较类似,如果是STRING类型,通过pipeline的方式(分布式不支持)可以极大提高查找效率;如果为hash类型,直接通过hmget即可。
protected void updateStatesToRedis(Map<String, String> keyValues) { Jedis jedis = null; try { jedis = jedisPool.getResource(); RedisDataTypeDescription description = this.options.dataTypeDescription; switch (description.getDataType()) { case STRING: String[] keyValue = buildKeyValuesList(keyValues); jedis.mset(keyValue); if(this.options.expireIntervalSec > 0){ Pipeline pipe = jedis.pipelined(); for(int i = 0; i < keyValue.length; i += 2){ pipe.expire(keyValue[i], this.options.expireIntervalSec); } pipe.sync(); } break; case HASH: jedis.hmset(description.getAdditionalKey(), keyValues); if (this.options.expireIntervalSec > 0) { jedis.expire(description.getAdditionalKey(), this.options.expireIntervalSec); } break;
相关推荐
卡夫卡风暴Redis 该项目实施了一种在实现螺栓状态的方法。 ###项目使用以下开源项目:### ###解释### ... ####Bolts#### Redis用作内存数据库来存储螺栓的中间状态。 该项目为具有容错状态的螺
项目框架storm-parent: jar和插件依赖工程storm-dao: 数据接口层storm-redis: 数据缓存层或消息队列层storm-analysis: 基于storm的数据分析层storm-web: API接口服务层storm-core: 核心业务层备注: 框架持续更新中。...
数据管道指南(使用 Apache Storm)该项目侧重于将 Apache Storm/Trident 与 Java 结合使用。 有关在没有Storm 的... 使用 Zookeeper 存储状态:当前示例将状态存储在 Redis 缓存中。 我们计划用 Zookeeper 替换它。
storm-contrib是用于Storm的模块的社区存储库。 这些包括与其他系统(Redis,Kafka,MongoDB等)集成的各种喷口/螺栓,以及Storm开发人员遇到的常见任务的代码。 有关Storm本身的更多信息,请参见。 组织 storm-...
该示例使用 node.js 应用程序来跟踪用户产品页面的点击次数,并使用 Redis 来存储和检索结果。 此示例适用于 Storm 示例 Web 应用程序,并且需要运行 Web 应用程序、Redis Spring XD(单节点)和 Spring XD shell。...
采用Storm实时数据流引擎进行数据实时计算,并应用MapReduce、Spark实现批处理计算和内存计算,解决高频时序数据存储与海量数据计算问题;采用Hadoop分布式文件系统(HDFS)实现文件的可靠存储,并采用HBase分布式...
⼤数据技术体系 ⽂件存储:Hadoop HDFS、Tachyon、KFS 离线计算:Hadoop MapReduce、Spark 流式、实时计算:Storm、Spark Streaming、S4、Heron K-V、NOSQL数据库:HBase、Redis、MongoDB 资源管理:YARN、Mesos ⽇...
Web 控制台是使用 Cuba 微框架、Redis、Ohm(Redis 的对象哈希映射库)创建的。用于测试的最可爱的 abd Capybara。 为什么是古巴? 我们想到的第一个答案是为什么不呢? 简单、小巧,我们认为它非常适合这个项目,...
Storm在此之前是standalone模式的部署方式,Flink由于其现在运行的环境,美团选择的是OnYARN模式,除了计算引擎之外,我们还提供一些实时存储功能,用于存储计算的中间状态、计算的结果、以及维度数据等,目前这一类...
(storm、kafka、Redis Cluster) 2、数据实时清洗/存储 3、营销平台应用开发(事件捕获、处理) 南京流处理xx运营商测试项目(2015年9月-2015年10月) 负责部分:数据分发存储(根据数据属性,实时将每一条数据分发...
拓扑简单地从输入喷嘴中读取单词流,并存储每个单词在redis服务器中的出现次数, redis服务器是键值缓存和存储。 在[1]中阅读有关redis更多信息。 输入喷口发出两批。 一批包含单词[ apple , ball , cat , dog
数据存储:分布式文件系统使用HDFS,分布式数据库使用HBase,Mongodb、Elasticsearch,内存数据库使用redis;数据计算:使用Hive、MR、HiveSQL、ETL开发离线计算系统;使用storm、flink、spark streaming开发实时...
将生成的标签存储在Redis数据库中。实时营销系统的核心是通过分析电信用户DPI(Deep Packet Inspection,深度包检测)数据、挖掘用户上网特征、添加用户标签、发现目标用户,并采用个性化插件包对用户提供个性化的营销...
Redis 开源的⽀持⽹络,基于内存可持久化⽇志,key-value数据库,可⽤于 数据库 缓存 消息中间件 Neo4j 开源⾼性能的NoSQL图形数据库 7 数据处理 MapReduce 分布式离线的计算框架 批处理 ⽇渐被spark和flink取代 ...
常见的分布式系统存储解决方案,包括MySQL的分布式扩展、HBase的API及使用场景、Redis的使用等。 如何使用分布式消息系统ActiveMQ来降低系统之间的耦合度,以及进行应用间的通信。 垂直化的搜索引擎在分布式...
比方,电商会使用传统的关系型数据库MySQL和Oracle等来存储每一笔事务数据,除此之外,Redis和MongoDB这样的NoSQL数据库也常用于数据的采集 在大数据的采集过程中,其主要特点和挑战是并发数高,因为同时有可能会有...
协议、架构、存储、缓存、安全 Bootstrap MySQL MongoDB Redis* Spark Hive Hadoop Storm 大数据开发 分布式计算平台 Scrapy Linux 下开发 NginX tomcat Rabbitmq WSGI 缓存 消息队列MQ 负载均衡LB memcached 网络...
storm, kafka stream等, 做批处理需要掌握hadoop, hive等, 数据存储需要掌握 hdfs, hbase, redis, tsdb,es等, 各个公司会基于这些组件针对部门特有的业务进行二次开发 大数据开发岗位主要考察以下几个理论知识: 熟悉...
适合用于支持数据和信息的查询,但数据的再处理度不高,具有计算并发度高 、数据规模大、结果可靠性较高的特点。通常使用分布式数据处理提高数据规模、使用 内存数据进行计算过程缓冲和优化。本平台主要采用Spark ...
比如 ,电商会使用传统的关系型数据库MySQL和Oracle等来存储每一笔事务数据,除此之外, Redis和MongoDB这样的NoSQL数据库也常用于数据的采集。 在大数据的采集过程中,其主要特点和挑战是并发数高,因为同时有可能...