load std autoload=std fn idxw { tr A-Z a-z | sed ' s/[^a-zA-Z0-9_]+/ /g s/[ ][ ]*/\n/g ' |sed -e 's/\.\.*$//' -e '/^$/d' | sed -e 's/$/ 1/' } fn refresh { names=`{ndb/query -a kfs '' name} for (i in $names) { (host chunk) = ${split ! `{ndb/query name $i master}} addr = `{ndb/regquery -n name $host id $chunk} if {~ $#addr 1} { if {ftest -e /mnt/registry/ ^$i} { echo host $host automount 1 persist 1 addr $addr replica `{ndb/query name $i replica}> /mnt/registry/^$i } { echo $i host $host automount 1 persist 1 addr $addr replica `{ndb/query name $i replica}> /mnt/registry/new } } {rm -f /mnt/registry/ ^$i} } } fn rmnt { for (file in $*) { (disk rest ) := `{cat /mnt/registry/$file} while {! ~ $#rest 0} { (name val tail) := $rest if { ~ $name 'addr'} {mount -c $val /n/ ^ $disk} rest = $tail } } } fn diskinit { rmnt d0.dist $1 cp -r /n/d0.dist/adm /n/$1 } fn update { rmnt d0.dist install/applylog -s -T /n/local/dist/time /n/local/dist/client.log /n/local /n/d0.dist < /n/d0.dist/dist/dist.log } fn replica { if {~ $#* 1} { disk:=$1 master:=`{ndb/regquery -n replica $disk} if{~ $#master 1} { rmnt $disk $master install/applylog -s -T /n/$disk/dist/time /n/$disk/dist/client.log /n/$disk /n/$master < /n/$master/dist/server.log } {echo replica: no master >[1=2]} } {echo usage: replica disk >[1=2]} } fn sync { if {~ $#* 1} { disk:=$1 builtin cd /n/ ^ $disk install/updatelog -x dist/server.log dist/server.log >> dist/server.log } {echo usage: sync disk >[1=2] } } rpid=() fn rcpu { cmd:=$1 disk:=${tl $*} cpu:=${nextcpu} (net host port) := ${split '!' $cpu} # echo nextcpu $net $host $port >[1=2] disk = ${finddisk $host} $disk s=${parse '{cpudisk=/n/' ^ ${hd $disk} ^' run /n/client/usr/caerwyn/lib/gridrc rmnt ' ^ $"disk ^ ' ' ^ $cmd ^ '}'} cpu $cpu sh -c $s & rpid=$apid $rpid echo ${hd $disk} } fn newcpu { grid/reglisten -r svc rstyx 'tcp!*!0' {runas $user auxi/rstyxd&} } fn gridinit { mount -ac {mntgen} /n ndb/cs chmod 666 /net/cs mount 'net!$registry!registry' /mnt/registry } fn newdisk { disk := $1 if {! ftest -e $disk} { zeros 1024 65536 > $disk } cmd= ${parse '{disk/kfs -rPW ' ^ $disk ^ '}' } grid/register -a id `{basename $disk} $cmd } fn registerdisk { for (i in $*) { cmd= ${parse '{disk/kfs -PW ' ^ $i ^ '}' } grid/register -a id `{basename $i} $cmd } } cpulist=() subfn nextcpu { if {~ $#cpulist 0} {cpulist=`{ndb/regquery -n svc rstyx} } result = ${hd $cpulist} cpulist = ${tl $cpulist} } subfn finddisk { host := $1 if {~ $#$host 0} {$host=`{ndb/regquery -n host $host | grep cpu}} result = ${hd $$host} $host=${tl $$host} } fn rwaiting { if {! ~ $#rpid 0} { ps |grep ${join '|' $rpid} rpid=${rwaiting} } } subfn rwaiting { result=() if {! ~ $#rpid 0} { result=`{ps |grep ${join '|' $rpid} |getlines {(p rest) :=${split ' ' $line}; echo $p}} } } fn mapreduce { rpid=() map:=$1 reduce:=$2 disks:=${tl ${tl $*}} jobid:=${pid} seq:=0 cpudisklist:=() for (i in $disks) { cmd := ${parse '{ ' ^ $map ^ ' |intermediate 5 $cpudisk/' ^ $jobid ^ '.' ^ $seq ^ '.inter }'} cpudisklist=`{rcpu $cmd $i} $cpudisklist seq=`{fc 1 $seq +} } # echo cpudisklist $cpudisklist while {! ~ $#{rwaiting} 0} { sleep 1 } rpid=() dlist:=() for (i in 0 1 2 3 4) { cmd := ${parse '{ cat /n/d?.cpu/' ^ $jobid ^'.*.inter' ^ $i ^ ' | sort | ' ^ $reduce ^ ' > $cpudisk/' ^ $jobid ^ '.part' ^ $i ^ '}' } dlist = `{rcpu $cmd $cpudisklist} $dlist } while {! ~ $#{rwaiting} 0} { sleep 1 } rpid=() echo $dlist } subfn accgen { pctl forkns unmount /mnt/agg { data := '0' {file2chan /mnt/agg/accgen {rreadone $data} {data = `{fc ${rget data} $data + } } } } grid/reglisten -r svc aggregator 'tcp!*!0' {export /mnt/agg &} > /tmp/svc svc=`{cat /tmp/svc} result = ${parse '{mount '^$svc^' /n/'^$svc^' ; echo -n $1 > /n/'^$svc^'/accgen; cat /n/'^$svc^'/accgen }' } } fn launchchunkq { mkdir /tmp/chunkq chunkqueue /tmp/chunkq/cq grid/reglisten -r svc chunkq 'tcp!*!0' {export /tmp/chunkq&} } fn freechunkscan { pctl forkns mount `{ndb/regquery -n svc chunkq} /n/chunkq for (i in `{ndb/regquery -n resource kfs}) { mount $i /n/remote builtin cd /n/remote # these should get added to the chunqueue for (j in *.ckfree ) { if {ftest -e $j} { echo $j $i > /n/chunkq/cq} } } } fn locatorscan { pctl forkns for (i in `{ndb/regquery -n resource kfs}) { mount $i /n/remote builtin cd /n/remote #these get added to the chunkid locator map for (j in *.ck ) { if {ftest -e $j} {echo $j $i} } } } CHUNKSIZE=1048576 fn putfiles { load expr pctl forkns rmnt master mount `{ndb/regquery -n svc chunkq} /n/chunkq bigfile:=$1 files:=${tl $*} chunk := `{tail -1 $bigfile} if {~ $#chunk 0} { chunk = ${allochunk} echo $chunk >> $bigfile } mount -c ${locatechunk $chunk} /n/remote for(i in $files){ size=0 (perm device inst owner group size rest) := `{ls -l /n/remote/$chunk} if { ntest ${expr $size $CHUNKSIZE gt} } { chunk = ${allochunk} echo $chunk >> $bigfile mount -c ${locatechunk $chunk} /n/remote } putwar $i |gzip >> /n/remote/$chunk } } subfn allochunk { chunkid := `{next /n/master/next} ^ .ck (free disk) := `{read < /n/chunkq/cq} mount -c $disk /n/remote mv /n/remote/^$free /n/remote/^$chunkid echo $chunkid $disk >> /n/master/locator result=$chunkid } subfn locatechunk { #should be called within putfiles only chunk := $1 (c disk) := `{grep $chunk /n/master/locator} result=$disk }