Next             Up                Back                   Contents

Επόμενο:Aσκήσεις Πάνω: Κεφάλαιο 8o: Προγράμματα περάσματος μηνυμάτων Πίσω: Αναφορές


 

ΠΡΟΓΡΑΜΜΑΤΙΣΤΙΚΕΣ ΕΡΓΑΣΙΕΣ

 

1. Ταξινόμηση σε τοπολογία Γραμμής

 

Η Ταξινόμηση με Αντικατάσταση είναι ένας αλγόριθμος τύπου διασωλήνωσης στον οποίο οι τιμές των δεδομένων κινούνται και προς τις δυο κατευθύνσεις κατά μήκος της διασωλήνωσης της διεργασίας. Οι μικρές τιμές κινούνται στα αριστερά και οι μεγάλες στα δεξιά, καταλήγοντας έτσι στην τελική ταξινόμηση των τιμών. Αρχικά, σε κάθε διεργασία έχει ανατεθεί μια ομάδα τιμών από την μη-ταξινομημένη λίστα. Κάθε διεργασία επαναλαμβάνει δύο φάσεις υπολογισμών: μια αριστερά κινούμενη φάση και μια δεξιά κινούμενη φάση. Κατά τη διάρκεια της αριστερά κινούμενης φάσης κάθε διεργασία στέλνει τη μικρότερη τιμή από τη δική της ομάδα στον αριστερό γείτονα και στη συνέχεια λαμβάνει την εισερχόμενη τιμή από τον δεξιό γείτονα. Κατά τη διάρκεια της δεξιά κινούμενης φάσης κάθε διεργασία στέλνει τημεγαλύτερη τιμή της ομάδας της στο δεξιό γείτονα και λαμβάνει την εισερχόμενη τιμή από τον αριστερό γείτονα.

Για την αύξηση της αποδοτικότητας είναι χρήσιμο για κάθε διεργασία να διατηρεί την ομάδα δεδομένων της ταξινομημένη κατά τη διάρκεια του αλγόριθμου. Αφότου κάθε διεργασία λάβει μια νέα τιμή από κάποιο γείτονα, πρέπει αυτή να ολοκληρωθεί στη δική της εσωτερική ομάδα. Αν στην αρχική λίστα υπάρχουν n τιμές τότε για την ταξινόμηση της λίστας απαιτούνται n/2 αριστερά-κινούμενες και δεξιά-κινούμενες φάσεις. Η εργασία αφορά το γράψιμο αυτού του Παράλληλου αλγορίθμου Ταξινόμησης με Αντικατάσταση. Το μέγεθος κάθε τοπικής ομάδας θα δηλώνεται στο πρόγραμμα από τη σταθερά groupsize, που θα καθορίζεται στην αρχή. Μπορείτε να υποθέσετε ότι το μέγεθος n της αρχικής λίστας διαιρείται ίσα από την groupsize, έτσι ώστε όλες οι διεργασίες να έχουν ίσα μεγέθη ομάδων.

Γράψτε ένα πρόγραμμα για αυτόν τον αλγόριθμο ταξινόμησης σε τοπολογία LINE σε σύστημα κατανεμημένης μνήμης. Εκτελέστε το πρόγραμμα για να προσδιορίσετε την επιτάχυνση του προγράμματος και το χρόνο εκτέλεσης για διαφορετικές τιμές της βασικής καθυστέρησης επικοινωνίας στην αρχιτεκτονική LINE. Επίσης σχεδιάστε ένα γράφημα απόδοσης του προγράμματος που θα παρουσιάζει το χρόνο εκτέλεσης σε σχέση με τη βασική καθυστέρηση επικοινωνίας.

 

 

2. Αριθμητική Ολοκλήρωση σε Πλέγμα

 

Μετατρέψτε το πρόγραμμα της αριθμητικής ολοκλήρωσης του σχήματος 5.8 έτσι ώστε να εκτελείται σε τοπολογία Διδιάστατου Πλέγματος σε σύστημα κατανεμημένης μνήμης. Θα απαιτηθεί μια νέα τεχνική πρόσθεσης των τοπικών αθροισμάτων, η οποία είναι περισσότερο κατάλληλη για συστήματα κατανεμημενης μνήμης. Προσπαθήστε να σχεδιάσετε το πρόγραμμα ώστε να αποδίδει καλά ειδικά σε Διδιάστατο Πλέγμα.

Για να ελέγξετε το πρόγραμμά σας χρησιμοποιείστε για την ολοκλήρωση την ακόλουθη συνάρτηση:

 

FUNCTION f(t: REAL): real;


BEGIN


    f:= Sqrt(4-t*t);


END;

 

Πραγματοποιείστε την επανάληψη μεταξύ των οριακών σημείων a=0 και b=2. Το αποτέλεσμα αυτού του επαναληπτικού προγράμματος πρέπει να είναι μια καλή προσέγγιση της τιμής του pi.

 

3. Επίλυση γραμμικών εξισώσεων σε Υπερκύβο

 

Θεωρείστε ένα σύστημα n γραμμικών εξισώσεων που προσδιορίζεται από έναν πίνακα συντελεστών A και έναν πίνακα B. Κάθε εξίσωση i αυτού του συστήματος έχει την ακόλουθη μορφή:

 

A[i, 1]x[1]+A[i, 2]x[2]+...+A[i, n]x[n]=B[i]

 

Ο σκοπός είναι ο προσδιορισμός των τιμών του πίνακα x που ικανοποιούν όλες τις εξισώσεις. Αν όλες οι τιμές του x είναι γνωστές εκτός από την x[i], τότε η παραπάνω εξίσωση μπορεί να χρησιμοποιηθεί για τον υπολογισμό του x[i] με τον ακόλουθο τρόπο:

 

image

Αν είναι γνωστές μόνο κάποιες κατά προσέγγιση εκτιμήσεις για τις άλλες τιμές του x, τότε η παραπάνω εξίσωση μπορεί να χρησιμοποιηθεί για τον υπολογισμό μιας εκτίμησης της τιμής του x[i]. Δεδομένου ενός ολοκληρωμένου συνόλου προσεγγιστικών τιμών του x, η παραπάνω τεχνική μπορεί να εφαρμοστεί σε κάθε στοιχείο του x ταυτόχρονα για τον υπολογισμό ενός νέου συνόλου τιμών του x. Για μια μεγάλη ποικιλία γραμμικών συστημάτων η επαναλαμβανόμενη εφαρμογή αυτής της διαδικασίας θα συγκλίνει σε μια λύση.

Η εργασία αφορά το γράψιμο ενός προγράμματος Υπερκύβου που λύνει ένα σύστημα γραμμικών εξισώσεων χρησιμοποιώντας αυτή την επαναληπτική μέθοδο. Σε κάθε επεξεργαστή θα ανατεθεί μια εξίσωση και αυτός θα είναι υπεύθυνος για τον επαναληπτικό υπολογισμό των διαδοχικών τιμών x[i]. Μετά τον υπολογισμό του στοιχείου του x που του έχει ανατεθεί από κάθε επεξεργαστή καθένας πρέπει να λάβει ένα αντίγραφο όλων των νέων τιμών από τους άλλους επεξεργαστές πριν να προχωρήσει στην επόμενη επανάληψη. Αυτή η ανταλλαγή των τιμών είναι στην πραγματικότητα μια λειτουργία πολλαπλής διάδοσης.

Για να καθορίσουμε πότε πρέπει να τερματίσει το πρόγραμμα καθορίζεται ως είσοδος μια επιθυμητή ακρίβεια της λύσης. Μετά τον υπολογισμό από κάθε επεξεργαστή της νέας τιμής για την τιμή x[i] που του έχει ανατεθεί, γίνεται σύγκριση του μεγέθους της αλλαγής του x[i] με την επιθυμητή ακρίβεια. Το αποτέλεσμα όλων των συγκρίσεων από όλους τους επεξεργαστές πρέπει να συλλεχθεί για να καθοριστεί αν το πρόγραμμα πρέπει να τελειώσει ή όχι. Για να εξοικονομήσουμε υπολογιστικό χρόνο, αυτή η συλλογή μπορεί να ενσωματωθεί στην πολλαπλή διάδοση με την περίληψη μιας επιπλέον τιμής στη διάδοση που αναφέρεται στα αποτελέσματα όλων των ελέγχων τερματισμού. Έτσι, σε κάθε k βήμα της πολλαπλής διάδοσης κάθε διεργασία αντί να στέλνει μόνο 2k-1 τιμές στο ταίρι της, στέλνει 2k-1+1 τιμές.

Όταν θα ελέγξετε το πρόγραμμά σας είναι απαραίτητο να επιλέξετε ένα δείγμα συστήματος εξισώσεων που συγκλίνει τελικά σε μία λύση. Μια προϋπόθεση που εγγυάται σύγκλιση είναι κυριαρχία του διαγώνιου στοιχείου της μήτρας Α (δες Bertsekas και Tsitsiklis, 1989):

 

Για κάθε γραμμή i, έχουμε: image

 

ΠΡΟΤΕΙΝΟΜΕΝΕΣ ΛΥΣΕΙΣ

 

1.

program sort;


    architecture line(11);


    Const 


        n=100;


        groupsize=10;


        groups=10;


    Type 


        grouplist=array[1..groupsize] of integer;


    Var


        i,j:integer;


        chan:array [0..groups+1] of channel of integer;


        in,out:array [1..groups] of grouplist;


 

procedure ranksort(var list:grouplist);


    var 


        i,j,rank,tmp:integer;


    begin (*Ταξινόμηση της i γραμμής του πίνακα*)


        for i:=1 to groupsize do 


        begin


            tmp:=list[i];


            j:=i;


            rank:=0;


            repeat


                j:=j mod groupsize+1;


                if tmp>list[j] then (*αν ο αριθμός είναι μεγαλύτερος η θέση του *)


                    rank:=rank+1 (*στον list αυξάνεται κατά 1*)

                else 

                    if tmp=list[j] then (*αν ο αριθμός είναι ίσος, η θέση του στον list *)

                if j<=i then (*αυξάνεται αν η θέση του στον tmp *)

                    rank:=rank+1; (*είναι μεγαλύτερη(ή μικρότερη) ίση*)

             until j=i;

            list[rank]:=tmp; 

        end;

    end;

procedure sortlist(var list:grouplist;lr:integer);

    var 

        i,tmp:integer;

 

    begin (*ταξινόμηση της γραμμής μετά την εισαγωγή του νέου στοιχείου*)

        if lr=0 then (* στο τέλος της*)

        begin 

            for i:=1 to groupsize-1 do

                if list[i]>list[i+1] then (*μεταφορά του μεγαλύτερου στο τέλος*)

                begin

                    tmp:=list[i];

                    list[i]:=list[i+1];

                    list[i+1]:=tmp;

                end;

            end

            else

            begin 

                for i:=groupsize downto 2 do

                    if list[i]‹list[i-1] then (*μεταφορά του μικρότερου στην αρχή*)

                    begin

                        tmp:=list[i];

                        list[i]:=list[i-1];

                        list[i-1]:=tmp;

                    end;

            end;

        end;

 

procedure replace(me:integer;inlist:grouplist;var outlist:grouplist);

    var 

        i,j,tmp:integer;

        tmplist:array[1..groupsize+1] of integer;

   begin

        ranksort(inlist);

        for i:=1 to (n div 2) do

        begin

            if me=1 then

            begin

                chan[me+1]:=inlist[groupsize]; (*στέλνει το μεγαλύτερο στα δεξιά*)

                inlist[groupsize]:=chan[me]; (*διαβάζει το τελευταίο στοιχείο*)

                sortlist(inlist,1);

            end

            else if me=groups then

          begin

                tmplist[1]:=chan[me]; (* διαβάζει από τα αριστερά*)

                for j:=2 to groupsize+1 do 

                    tmplist[j]:=inlist[j-1];

                for j:=1 to groupsize do (*ταξινόμηση της γραμμής μετά από την *)

                    if tmplist[j]>tmplist[j+1] then (*εισαγωγή του νέου στοιχείου στην αρχή*)

                    begin

                        tmp:=tmplist[j];

                        tmplist[j]:=tmplist[j+1];

                        tmplist[j+1]:=tmp;

                    end;

                    for j:=2 to groupsize+1 do 

                        inlist[j-1]:=tmplist[j];

                    chan[me-1]:=tmplist[1]; (*στέλνει το μικρότερο αριστερά*)

                end

                else

                begin

                    tmp:=chan[me]; (*διαβάζει από τα αριστερά*)

                    chan[me+1]:=inlist[groupsize]; (*στέλνει προς τα δεξιά*)

                    inlist[groupsize]:=tmp;

                    sortlist(inlist,1);

                    tmp:=chan[me]; (* διαβάζει από τα δεξιά*)

                    chan[me-1]:=inlist[1]; (* στέλνει προς τα αριστερά*)

                    inlist[1]:=tmp;

                    sortlist(inlist,0);

                end;

            end;

            outlist:=inlist;

        end;

Begin

    for i:=1 to groups do (*Αρχικοποίηση*)

        for j:=1 to groupsize do 

            readln(in[i,j]);

    forall i:=1 to groups do 

        (@i port chan[i]) replace(i,in[i],out[i]);

    for i:=1 to groups do 

    begin (*Εμφάνιση αποτελεσμάτων*)

        writeln;

        for j:=1 to groupsize do 

            write(in[i,j]:5);

      end;

     writeln;

    for i:=1 to groups do 

    begin

        writeln;

        for j:=1 to groupsize do 

            write(out[i,j]:5);

   end;

End.

image

 

3.

       Program Gauss_Seidel;


            Architecture Hypercube(3);


            Const


                d=3;


                n=8;


                tolerance=.1;


            type


                listtype=array[0..n] of real;


                datatype=array[1..n] of real;


            Var


                  A=array [1..n] of datatype;


                B,Xin,Xout:datatype;


                chan1:array[0..n,1..d] of channel of real;


                chan2:array[o..n,1..d] of channel of boolean;


                Graycode:array[1..n] of integer;


                i,j:integer;


            Procedure Broadcast(myvalue:real;Var mylist:listtype;mydone:boolean;Var done:boolean);


                var


                    mynum,partner,bitvalue,stage,l:integer;


                    hisdone:boolean;


            begin


                mynum:=%Self;


                mylist[0]:=myvalue;


                bitvalue:=1;


                for stage:=1 to d do


                begin


                    if mynum div bitvalue mod 2=0 then  partner:=mynum+bitvalue


                    else partnet:=mynum-bitvalue;


                    for l:=0 to bitvalue-1 do


                        chan1[partner,stage]:=mylist[1];


                    chan2[partner,stage]:=mydone;


                    for l:=bitvalue to 2*bitvalue-1 do


                        mylist[1]:=chan1[mynum,stage];


                    hisdone:=chan2[mynum,stage];


                    mydone:=mydone and hisdone;


                    bitvalue:=2*bitvalue;


                end;


                done:=mydone;


            end;


            Procedure Process(mynumber:integer;inA:datatype;inB:real;inX:datatype;Var outX:datatype);


                Var


                    sum,oldvalue,change:real;


                    k,t,i:integer;


                    values:listtype;


                    done:boolean;


            begin


                repeat


                    oldvalue:=inX[mynumber];


                    sum:=0.0;


                    for k:=1 to n do


                        if (mynumber‹›k) then sum:=sum+inA[k]*inX[k];

                    inX[mynumber]:=-1/inA[mynumber]*(sum-inB);

                    change:=abs(inX[mynumber]-oldvalue);

                    Broadcast(inX[mynumber],values,change‹tolerance,done);

                    for t:=1 to n do

                        inX[t]:=values[t];

                 until done;

                ouX:=inX;

            end;

        begin

            Graycode[1]:=0;Graycode[2]:=1;Graycode[2]:=3;Graycode[4]:=2;

            Graycode[5]:=6;Graycode[6]:=7;Graycode[7]:=5;Graycode[8]:=4;

            for i:=1 to n do

                for j:=1 to n do

                    readln(A[i,j]);

            for i :=1 to n do

                readln(B[i]);

            for i:=1 to n do

                Xin[i]:=1;

            forall i:=1 to n do

                (@Graycode[i] port chan1[Graycode[i]];chan2[graycode[i]])

            Process(i,A[i],B[i],Xin,Xout);

            for i:=1 to n do

                writeln(Xout[i]);

        end.


     Next             Up                Back                   Contents

Επόμενο:Aσκήσεις Πάνω: Κεφάλαιο 8o: Προγράμματα περάσματος μηνυμάτων Πίσω: Αναφορές