ignite集群部署及并置计算

kanten 发布于 2018/07/07 13:40
阅读 1K+
收藏 1

我按照ignite官方手册进行以下集群部署:本地cmd启动ignite(server);java启动ignite(server)。代码中执行数据并置计算,结果两个server启动的情况下出错,单ignite情况下正常,错误信息及代码如下,请大神们帮小白入门,感谢!

[13:38:04,949][SEVERE][sys-#55][GridTaskWorker] Failed to obtain remote job result policy for result from ComputeTask.result(..) method (will fail the whole task): GridJobResultImpl [job=C4 [r=my.igniteaffinity.igniteaffinity$2@269f4bad], sib=GridJobSiblingImpl [sesId=ebb2f337461-c6a27e51-a3ac-4760-af5c-8979b9b82809, jobId=0cb2f337461-c6a27e51-a3ac-4760-af5c-8979b9b82809, nodeId=52668a80-2aba-4227-9f9b-9fa1a1859dbb, isJobDone=false], jobCtx=GridJobContextImpl [jobId=0cb2f337461-c6a27e51-a3ac-4760-af5c-8979b9b82809, timeoutObj=null, attrs={}], node=TcpDiscoveryNode [id=52668a80-2aba-4227-9f9b-9fa1a1859dbb, addrs=[0:0:0:0:0:0:0:1, 127.0.0.1, 192.168.31.37, 2001:0:dcfa:40e1:3419:2ac0:3f57:e0da], sockAddrs=[windows10.microdone.cn/192.168.31.37:47500, /2001:0:dcfa:40e1:3419:2ac0:3f57:e0da:47500, /0:0:0:0:0:0:0:1:47500, /127.0.0.1:47500], discPort=47500, order=1, intOrder=1, lastExchangeTime=1530941883742, loc=false, ver=2.5.0#20180524-sha1:86e110c7, isClient=false], ex=class o.a.i.binary.BinaryInvalidTypeException: my.igniteaffinity.dao.CompanyAnnotation, hasRes=true, isCancelled=false, isOccupied=true]
class org.apache.ignite.IgniteException: Remote job threw user exception (override or implement ComputeTask.result(..) method if you would like to have automatic failover for this exception): my.igniteaffinity.dao.CompanyAnnotation
    at org.apache.ignite.compute.ComputeTaskAdapter.result(ComputeTaskAdapter.java:102)

代码如下:

package my.igniteaffinity;

import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.lang.IgniteRunnable;

import my.igniteaffinity.dao.CompanyAnnotation;
import my.igniteaffinity.dao.PersonAnnotation;
import my.igniteaffinity.dao.PersonKey;

public class igniteaffinity {

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        try(Ignite ignite = Ignition.start("examples/config/example-ignite.xml")){
            try(IgniteCache<Object,Object> cache = ignite.getOrCreateCache("DEFAULT_CACHE")){
                CompanyAnnotation c1 = new CompanyAnnotation(1L, "1��˾");
                CompanyAnnotation c2 = new CompanyAnnotation(1L, "2��˾");
                CompanyAnnotation c3 = new CompanyAnnotation(1L, "3��˾");
                CompanyAnnotation c4 = new CompanyAnnotation(1L, "4��˾");
                cache.put(1L, c1);
                cache.put(2L, c2);
                cache.put(3L, c3);
                cache.put(4L, c4);

                PersonKey pk1 = new PersonKey(11L, 1L);
                PersonKey pk2 = new PersonKey(22L, 2L);
                PersonKey pk3 = new PersonKey(33L, 3L);
                PersonKey pk4 = new PersonKey(44L, 4L);
                PersonAnnotation p1 = new PersonAnnotation(pk1, "1��");
                PersonAnnotation p2 = new PersonAnnotation(pk2, "2��");
                PersonAnnotation p3 = new PersonAnnotation(pk3, "3��");
                PersonAnnotation p4 = new PersonAnnotation(pk4, "4��");
                cache.put(pk1, p1);
                cache.put(pk2, p2);
                cache.put(pk3, p3);
                cache.put(pk4, p4);
                //igniteCompute,��������Dz�affinityRun��
                igniteComputeEnjoy(ignite);
                //igniteCluster����������Dz���mapKeysToNodes��
                igniteClusterEnjoy(ignite);
            }
        }
    
    }

    private static void igniteClusterEnjoy(final Ignite ignite) {
        // TODO Auto-generated method stub
        ClusterGroup group = ignite.cluster().forCacheNodes("DEFAULT_CACHE");
        Affinity<Object> affinity = ignite.<Object>affinity("DEFAULT_CACHE");
        Map<ClusterNode, Collection<Object>> mappings = affinity.mapKeysToNodes(Arrays.asList(1L,2L,3L,4L));
        for (Entry<ClusterNode, Collection<Object>> mapping : mappings.entrySet()) {
            ClusterNode node = mapping.getKey();
            final Collection<Object> mappedKeys = mapping.getValue();

            if (node != null) {
                // ����������������ڵĽڵ�
                ignite.compute(ignite.cluster().forNode(node)).run(new IgniteRunnable() {
                    @Override public void run() {
                        IgniteCache<Object, Object> cache = ignite.cache("DEFAULT_CACHE");
                        for (Object key : mappedKeys) {
                            long parseLong = Long.parseLong(key.toString());
                            System.out.println("ԭ���� [key= " + key +
                                    ", value=" + cache.localPeek(key) + ']');
                            PersonAnnotation ppp = (PersonAnnotation)cache.localPeek(new PersonKey(parseLong*11, parseLong));
                            System.out.println("�׺ͼ�  [key= " + key +
                                    ", value=" + ppp.getName()  + ']');
                        }
                    }
                });
            }
        }
    }

    private static void igniteComputeEnjoy(Ignite ignite) {
        // TODO Auto-generated method stub
        //��ȡ�������
        final IgniteCache<Object, Object> cache = ignite.getOrCreateCache("DEFAULT_CACHE");
        IgniteCompute compute = ignite.compute();
        for(int i = 1; i < 5;i++){
            final Long key = (long) i;
            compute.affinityRun("DEFAULT_CACHE", key, new IgniteRunnable(){
                @Override
                public void run() {
                    System.out.println("====>" + key);
                    CompanyAnnotation peekValue = (CompanyAnnotation) cache.localPeek(key,CachePeekMode.ALL);
                    System.out.println("peekValue" + peekValue);
                    System.out.println("��ȡ���˹�˾��" + peekValue.getCompanyName());
                    PersonAnnotation person = (PersonAnnotation) cache.localPeek(new PersonKey(key*11, key), CachePeekMode.ALL);
                    System.out.println("��ȡ��" + peekValue.getCompanyName() + "��Ա��" + person.getName());
                }

            });
        }
    }
}

ignite配置使用example-ignite,其中
        <property name="peerClassLoadingEnabled" value="true"/>

请大神帮忙指导一下,多谢

加载中
OSCHINA
登录后可查看更多优质内容
返回顶部
顶部