糖果派对官方网站_可以赌钱的糖果游戏_手机版
Ignite CS 形式 java初探

Ignite CS 形式 java初探

作者:网络编程    来源:未知    发布时间:2020-02-13 10:25    浏览量:

命令行的花样扩张集群,是连连的充实Server,在事实上的重重职业场景中,要求我们用Ignite实现CS模式,本文将提供详实的代码。

文化准备

不停查询语言(CQL, continuous query language)相近于:内部存款和储蓄器数据库+视图+触发器 的消除方案。简单来说,意气风发有契合条件的靶子步入查询结果集,就实行一回回调函数。

本文的贯彻是依靠C/S方式的,即Client端先遵照一定准绳从Server端查询数据,重回结果集后,Server端继续增进切合条件的数目,Client端依旧可以实时查询再次回到结果。持续查询能够监听缓存中数量的改变。持续查询一旦运维,如若有,就能够收到相符查询条件的数据变化的布告。

  • github地址:https://github.com/netty/netty
  • 官网:http://netty.io
  • Demo地址:
    • https://github.com/jiutianbian/java_learn/tree/master/NettyServerTest
    • https://github.com/jiutianbian/java_learn/tree/master/NettyClinetTest

主要maven依赖:

 <dependency> <groupId>org.apache.ignite</groupId> <artifactId>ignite-core</artifactId> <version>${ignite.version}</version> </dependency> <dependency> <groupId>org.apache.ignite</groupId> <artifactId>ignite-spring</artifactId> <version>${ignite.version}</version> </dependency> <dependency> <groupId>org.apache.ignite</groupId> <artifactId>ignite-indexing</artifactId> <version>${ignite.version}</version> </dependency> <properties> <ignite.version>2.3.0</ignite.version> </properties>

主要maven依赖:

 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.apache.ignite</groupId> <artifactId>ignite-core</artifactId> <version>${ignite.version}</version> </dependency> <dependency> <groupId>org.apache.ignite</groupId> <artifactId>ignite-spring</artifactId> <version>${ignite.version}</version> </dependency> <dependency> <groupId>org.apache.ignite</groupId> <artifactId>ignite-indexing</artifactId> <version>${ignite.version}</version> </dependency> <properties> <ignite.version>2.4.0</ignite.version> </properties>

TIPS:本工程使用的ignite的本子是2.4.0,ignite新陈代谢超级快,版本见得差距照旧异常的大的。

一、Netty Server端

  • IDE和创设筑工程具:eclipse meaven
  • Netty版本号:5.0.0.Alpha2

Server端实现:

import com.jc.searchengine.po.Person;import org.apache.ignite.Ignite;import org.apache.ignite.IgniteCache;import org.apache.ignite.IgniteCompute;import org.apache.ignite.Ignition;import org.apache.ignite.cache.CacheMode;import org.apache.ignite.cluster.ClusterGroup;import org.apache.ignite.configuration.CacheConfiguration;import org.apache.ignite.configuration.IgniteConfiguration;import java.util.Set;/** * @Author: wangjie * @Description: * @Date: Created in 10:20 2018/3/23 */public class Application { public static void main(String[] args) { Ignite ignite = Ignition.start(); CacheConfiguration cacheCfg = new CacheConfiguration(); cacheCfg.setName("serverCache"); cacheCfg.setCacheMode(CacheMode.PARTITIONED); IgniteCache<Long,Person> cache = ignite.getOrCreateCache; ClusterGroup clusterGroup = ignite.cluster().forServers(); cache.put(1L,new Person(1,"Sofiya",2)); cache.put(2L,new Person(1,"Sofiya",2)); cache.put(3L,new Person(1,"Sofiya",666666666)); } }

Server端的代码实质上第黄金年代做了2个操作:

  • 始建二个名字为“serverCache”的缓存区
  • 在缓存区插入数据

重视代码完毕

github源代码

package xx.xx.searchengine;import org.apache.ignite.Ignite;import org.apache.ignite.IgniteCache;import org.apache.ignite.Ignition;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestMethod;import org.springframework.web.bind.annotation.RestController;/** * @Author: wangjie * @Description: * @Date: Created in 10:13 2018/3/27 */@SpringBootApplication@RestControllerpublic class ServerApplication { //cache name private static final String CACHE_NAME = "serverCache"; private static Ignite ignite = Ignition.start("example-cache.xml"); public static void main(String[] args) throws InterruptedException { SpringApplication.run(ServerApplication.class,args); } @RequestMapping(value = "/testIgnite",method = RequestMethod.GET) public String testIgnite(Integer key,String value) throws InterruptedException{ ignite.active; System.out.println("*******insert data begins*********"); try(IgniteCache<Integer, String> cache = ignite.getOrCreateCache(CACHE_NAME)){ cache.put(key,value); Thread.sleep; } return "*******insert data succeed*********"; }}

 <property name="clientMode" value="false"/> <property name="peerClassLoadingEnabled" value="true"/>

github源代码

package xx.xx.searchengine;import org.apache.ignite.Ignite;import org.apache.ignite.IgniteCache;import org.apache.ignite.Ignition;import org.apache.ignite.cache.query.ContinuousQuery;import org.apache.ignite.cache.query.QueryCursor;import org.apache.ignite.cache.query.ScanQuery;import org.apache.ignite.lang.IgniteBiPredicate;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import javax.cache.Cache;import javax.cache.event.CacheEntryEvent;import javax.cache.event.CacheEntryUpdatedListener;/** * @Author: wangjie * @Description: * @Date: Created in 10:26 2018/3/27 */@SpringBootApplicationpublic class ClientApplication { //cache name private static final String CACHE_NAME = "serverCache"; public static void main(String[] args) throws InterruptedException { SpringApplication.run(ClientApplication.class, args); try (Ignite ignite = Ignition.start("example-cache.xml")) { ignite.active; System.out.println("**********Cache continuous query example started**********"); try (IgniteCache<Integer, String> cache = ignite.getOrCreateCache(CACHE_NAME)) { // Create new continuous query. ContinuousQuery<Integer, String> qry = new ContinuousQuery<>(); //init query qry.setInitialQuery(new ScanQuery<>(new IgniteBiPredicate<Integer, String>() { @Override public boolean apply(Integer key, String val) { return key > 0; } })); //set local listener qry.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() { @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts) { for (CacheEntryEvent<? extends Integer, ? extends String> e : evts) { System.out.println("Updated entry [key=" + e.getKey() + ", val=" + e.getValue; } } }); try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query { // Iterate through existing data. for (Cache.Entry<Integer, String> e : cur) { System.out.println("Queried existing entry [key=" + e.getKey() + ", val=" + e.getValue; Thread.sleep(2000000000); } } finally { ignite.destroyCache(CACHE_NAME); } } } }}
  • 伊始化查询当要推行不断查询时,在将持续查询注册在集群中以至开首接到更新早先,能够有选取地钦定叁个开头化查询。最初化查询能够经过ContinuousQuery.setInitialQuery方法开展设置,并且能够是随意查询类型,包罗扫描查询,SQL查询和文书查询。
  • 长距离过滤器这么些过滤器在给定键对应的主和备节点上实行,然后评估更新是还是不是要求作为五个事件传播给该查询的地头监听器。借使过滤器重临true,那么本地监听器就能够收下通告,不然事件会被忽视。产生更新的特定主和备节点,会在主/备节点以至应用端推行的地点监听器之间,裁减不供给的互联网流量。远程过滤器能够透过ContinuousQuery.setRemoteFilter(CacheEntryEventFilter<K, V>卡塔尔方法开展设置。
  • 本地监听器当缓存被改换时(一个不成方圆被插入、更新大概去除),更新对应的平地风波就能发送给持续查询的地点监听器,之后选用就能够做出相应的感应。当事件经过了中间隔过滤器,他们就能被发送给客商端,通告何地的当地监听器。本地监听器是经过ContinuousQuery.setLocalListener(CacheEntryUpdatedListener<K, V>卡塔尔(قطر‎方法设置的。
 <property name="clientMode" value="true"/> <property name="peerClassLoadingEnabled" value="true"/>

1. 新建工程,通过meaven导入Netty的库包

找到工程中的pom.xml文件,在dependencies中增加如下代码

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>5.0.0.Alpha2</version>
</dependency>

Client 端实现:

import com.jc.searchengine.po.Person;import org.apache.ignite.*;import org.apache.ignite.cache.CacheMode;import org.apache.ignite.cache.query.ContinuousQuery;import org.apache.ignite.cache.query.ScanQuery;import org.apache.ignite.cluster.ClusterGroup;import org.apache.ignite.configuration.CacheConfiguration;import org.apache.ignite.configuration.IgniteConfiguration;import org.apache.ignite.internal.util.lang.IgnitePredicate2X;import org.apache.ignite.lang.IgniteBiPredicate;import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;import javax.cache.Cache;import java.time.Period;import java.util.Arrays;/** * @Author: wangjie * @Description: * @Date: Created in 10:55 2018/3/23 */public class Application{ public static void main(String[] args) { //此节点配置为客户端 Ignition.setClientMode; IgniteConfiguration cfg = new IgniteConfiguration(); TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); commSpi.setSlowClientQueueLimit; TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi(); TcpDiscoveryVmIpFinder ipfinder = new TcpDiscoveryVmIpFinder(); ipfinder.setAddresses(Arrays.asList("localhost")); discoverySpi.setIpFinder; cfg.setCommunicationSpi; cfg.setDiscoverySpi(discoverySpi); Ignite ignite = Ignition.start; ClusterGroup clusterGroup = ignite.cluster().forClients(); IgniteCache<Long,Person> cache = ignite.getOrCreateCache("serverCache"); IgniteCompute compute = ignite.compute(clusterGroup); compute.broadcast -> { Person s1 = cache.get; Person s2 = cache.get; Person s3 = cache.get; System.out.println(s1.toString() + " " + s2.toString() + " " + s3.toString; }}

Client端的代码做了以下职业:

  • 将此节点配置为客商端形式
  • 收获到刚刚Server段创制的“serverCache”缓存区
  • 将Server端的数量呈现出来

分别运营Server端和Client端的主次,查看调节台:

运转程序,测量试验三番四次查询

bb电子糖果派对 1Server.pngbb电子糖果派对 2Client.png

"

bb电子糖果派对 3postman.pngbb电子糖果派对 4bb电子糖果派对,Client-updata.png

有关ignite的任何小说:Ignite CS 情势 java初探Ignite 之计算运用的 Hello world

前后相继媛小白风度翩翩枚,如有错误,烦请商量指正!

2. 创建NettyServer

新建NettyServer类

package com.jiutianbian.NettyTest;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyServer {
    private int port;

    public NettyServer(int port) {
        this.port = port;
        bind();
    }

    private void bind() {
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();

            bootstrap.group(boss, worker);
            bootstrap.channel(NioServerSocketChannel.class);
            bootstrap.option(ChannelOption.SO_BACKLOG, 1024); // 连接数
            bootstrap.option(ChannelOption.TCP_NODELAY, true); // 不延迟,消息立即发送
            bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); // 长连接
            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel)
                        throws Exception {
                    ChannelPipeline p = socketChannel.pipeline();
                    p.addLast(new NettyServerHandler());// 添加NettyServerHandler,用来处理Server端接收和处理消息的逻辑
                }
            });
            ChannelFuture channelFuture = bootstrap.bind(port).sync();
            if (channelFuture.isSuccess()) {
                System.err.println("启动Netty服务成功,端口号:" + this.port);
            }
            // 关闭连接
            channelFuture.channel().closeFuture().sync();

        } catch (Exception e) {
            System.err.println("启动Netty服务异常,异常信息:" + e.getMessage());
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new NettyServer(10086);
    }
}

上一篇:没有了
友情链接: 网站地图
Copyright © 2015-2019 http://www.tk-web.com. bb电子糖果派对有限公司 版权所有