Source mp_broadcast.nas
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
###############################################################################
##
## A message based information broadcast for the multiplayer network.
##
## Copyright (C) 2008 - 2013 Anders Gidenstam (anders(at)gidenstam.org)
## This file is licensed under the GPL license version 2 or later.
##
###############################################################################
###############################################################################
# Event broadcast channel using a MP enabled string property.
# Events from users in multiplayer.ignore are ignored.
#
# EventChannel.new(mpp_path)
# Create a new event broadcast channel. Any MP user with the same
# primitive will receive all messages sent to the channel from the point
# she/he joined (barring severe MP packet loss).
# NOTE: Message delivery is not guaranteed.
# mpp_path - MP property path : string
#
# EventChannel.register(event_hash, handler)
# Register a handler for the event identified by the hash event_hash.
# event_hash - hash value for the event : a unique 4 character string
# handler - a handler function for the event : func (sender, msg)
#
# EventChannel.deregister(event_hash)
# Deregister the handler for the event identified by the hash event_hash.
# event_hash - hash value for the event : a unique 4 character string
#
# EventChannel.send(event_hash, msg)
# Sends the event event_hash with the message msg to the channel.
# event_hash - hash value for the event : a unique 4 character string
# msg - text string with Binary data encoded data : string
#
# EventChannel.die()
# Destroy this EventChannel instance.
#
var EventChannel = {};
EventChannel.new = func (mpp_path) {
var obj = BroadcastChannel.new(mpp_path,
func (n, msg) { obj._process(n, msg) });
# Save send from being overriden.
obj.parent_send = obj.send;
# Put EventChannel methods before BroadcastChannel methods.
obj.parents = [EventChannel] ~ obj.parents;
obj.events = {};
return obj;
}
EventChannel.register = func (event_hash,
handler) {
me.events[event_hash] = handler;
}
EventChannel.deregister = func (event_hash) {
delete(me.events, event_hash);
}
EventChannel.send = func (event_hash,
msg) {
me.parent_send(event_hash ~ msg);
}
############################################################
# Internals.
EventChannel._process = func (n, msg) {
var event_hash = Binary.readHash(msg);
if (contains(me.events, event_hash)) {
me.events[event_hash](n, substr(msg, Binary.sizeOf["Hash"]));
}
}
###############################################################################
# Broadcast primitive using a MP enabled string property.
# Broadcasts from users in multiplayer.ignore are ignored.
#
# BroadcastChannel.new(mpp_path, process)
# Create a new broadcast primitive. Any MP user with the same
# primitive will receive all messages sent to the channel from the point
# she/he joined (barring severe MP packet loss).
# NOTE: Message delivery is not guaranteed.
# mpp_path - MP property path : string
# process - handler called when receiving a message : func (n, msg)
# n is the base node of the senders property tree
# (i.e. /ai/models/multiplay[x])
# send_to_self - if 1 locally sent messages are : int {0,1}
# delivered just like remote messages.
# If 0 locally sent messages are not delivered
# to the local receiver.
# accept_predicate - function to select which : func (p)
# multiplayers to listen to.
# p is the multiplayer entry node.
# The default is to accept any multiplayer.
# on_disconnect - function to be called when an : func (p)
# accepted MP user leaves.
# enable_send - Set to 0 to disable sending.
#
# BroadcastChannel.send(msg)
# Sends the message msg to the channel.
# msg - text string with Binary data encoded data : string
#
# BroadcastChannel.die()
# Destroy this BroadcastChannel instance.
#
var BroadcastChannel = {};
BroadcastChannel.new = func (mpp_path, process,
send_to_self = 0,
accept_predicate = nil,
on_disconnect = nil,
enable_send=1) {
var obj = { parents : [BroadcastChannel],
mpp_path : mpp_path,
send_node : enable_send ? props.globals.getNode(mpp_path, 1)
: nil,
process_msg : process,
send_to_self : send_to_self,
accept_predicate :
(accept_predicate != nil) ? accept_predicate
: func (p) { return 1; },
on_disconnect : (on_disconnect != nil) ? on_disconnect
: func (p) { return; },
# Internal state.
started : 0, # External state: started/stopped.
running : 0, # Internal state: running or not.
send_buf : [],
peers : {},
loopid : 0,
last_time : 0.0, # For join handling.
last_send : 0.0 # For the send queue
};
if (enable_send and (obj.send_node == nil)) {
logprint(LOG_WARN, "BroadcastChannel invalid send node.");
return nil;
}
setlistener(obj.ONLINE_pp, func {
obj.set_state();
});
obj.start();
return obj;
}
BroadcastChannel.send = func (msg) {
if (!me.running or me.send_node == nil)
return;
var t = getprop("/sim/time/elapsed-sec");
if (((t - me.last_send) > me.SEND_TIME) and (size(me.send_buf) == 0)) {
me.send_node.setValue(msg);
me.last_send = t;
if (me.send_to_self) me.process_msg(props.globals, msg);
} else {
append(me.send_buf, msg);
}
}
BroadcastChannel.die = func {
me.loopid += 1;
me.started = 0;
me.running = 0;
#print("BroadcastChannel[" ~ me.mpp_path ~ "] ... destroyed.");
}
BroadcastChannel.start = func {
#print("mp_broadcast.nas: starting channel " ~ me.mpp_path ~ ".");
me.started = 1;
me.set_state();
}
BroadcastChannel.stop = func {
#print("mp_broadcast.nas: stopping channel " ~ me.mpp_path ~ ".");
me.started = 0;
me.set_state();
}
############################################################
# Internals.
BroadcastChannel.ONLINE_pp = "/sim/multiplay/online";
BroadcastChannel.PERIOD = 1.3;
BroadcastChannel.SEND_TIME = 0.6;
BroadcastChannel.set_state = func {
if (me.started and getprop(me.ONLINE_pp)) {
if (me.running) return;
#print("mp_broadcast.nas: activating channel " ~ me.mpp_path ~ ".");
me.running = 1;
me._loop_(me.loopid += 1);
} else {
#print("mp_broadcast.nas: deactivating channel " ~ me.mpp_path ~ ".");
me.running = 0;
me.loopid += 1;
}
}
BroadcastChannel.update = func {
var t = getprop("/sim/time/elapsed-sec");
var process_msg = me.process_msg;
# Handled join/leave. This is done more seldom.
if ((t - me.last_time) > me.PERIOD) {
var mpplayers =
props.globals.getNode("/ai/models").getChildren("multiplayer");
foreach (var pilot; mpplayers) {
var valid = pilot.getChild("valid");
if ((valid != nil) and valid.getValue() and
!contains(multiplayer.ignore,
pilot.getChild("callsign").getValue())) {
if ((me.peers[pilot.getIndex()] == nil) and
me.accept_predicate(pilot)) {
me.peers[pilot.getIndex()] =
MessageChannel.
new(pilot.getNode(me.mpp_path),
MessageChannel.new_message_handler(process_msg, pilot));
}
} else {
if (contains(me.peers, pilot.getIndex())) {
delete(me.peers, pilot.getIndex());
me.on_disconnect(pilot);
}
}
}
me.last_time = t;
}
# Process new messages.
foreach (var w; keys(me.peers)) {
if (me.peers[w] != nil) me.peers[w].update();
}
# Check send buffer.
if (me.send_node == nil) return;
if ((t - me.last_send) > me.SEND_TIME) {
if (size(me.send_buf) > 0) {
me.send_node.setValue(me.send_buf[0]);
if (me.send_to_self) me.process_msg(props.globals, me.send_buf[0]);
me.send_buf = subvec(me.send_buf, 1);
me.last_send = t;
} else {
# Nothing new to send. Reset the send property to save bandwidth.
me.send_node.setValue("");
}
}
}
BroadcastChannel._loop_ = func (id) {
me.running or return;
id == me.loopid or return;
#print("mp_broadcast.nas: " ~ me.mpp_path ~ ":" ~ id ~ ".");
me.update();
settimer(func { me._loop_(id); }, 0, 1);
}
######################################################################
###############################################################################
# Lamport clock. Useful for creating a total order for events or messages.
# The users' callsigns are used to break ties.
#
# LamportClock.new()
# Creates a new lamport clock for this user.
#
# LamportClock.merge(sender, sender_timestamp)
# Merges the timestamp from the sender with the local clock.
# sender : base node of the senders property tree
# sender_timestamp : the timestamp received from the sender.
# Returns 1 if the local clock was advanced; 0 otherwise.
#
# LamportClock.advance()
# Advances the local clock one tick.
#
# LamportClock.timestamp()
# Returns an encoded 4 character long timestamp from the local clock.
#
var LamportClock = {
# LamportClock.new()
# Creates a new lamport clock for this user.
new : func {
var obj = {
parents : [LamportClock],
callsign : getprop("/sim/multiplay/callsign"),
time : 0
};
return obj;
},
merge : func (sender, sender_timestamp) {
var sender_time = Binary.decodeInt28(sender_timestamp);
if (sender_time > me.time) {
me.time = sender_time;
return 1;
} elsif ((sender_time == me.time) and
(cmp(sender.getNode("callsign").getValue(), me.callsign) > 0)) {
return 1;
} else {
# The received timestamp is old and should be ignored.
return 0;
}
},
advance : func {
me.time += 1;
},
timestamp : func {
return Binary.encodeInt28(me.time);
}
};
###############################################################################
# Some routines for encoding/decoding values into/from a string.
# NOTE: MP is picky about what it sends in a string propery.
# Encode 7 bits as a printable 8 bit character.
var Binary = {};
Binary.TWOTO27 = 134217728;
Binary.TWOTO28 = 268435456;
Binary.TWOTO31 = 2147483648;
Binary.TWOTO32 = 4294967296;
Binary.sizeOf = {};
############################################################
Binary.sizeOf["int"] = 5;
Binary.encodeInt = func (int) {
var bf = bits.buf(5);
if (int < 0) int += Binary.TWOTO32;
var r = int;
for (var i = 0; i < 5; i += 1) {
var c = math.mod(r, 128);
bf[4-i] = c + `A`;
r = (r - c)/128;
}
return bf;
}
############################################################
Binary.decodeInt = func (str) {
var v = 0;
var b = 1;
for (var i = 0; i < 5; i += 1) {
v += (str[4-i] - `A`) * b;
b *= 128;
}
if (v / Binary.TWOTO31 >= 1) v -= Binary.TWOTO32;
return int(v);
}
############################################################
# NOTE: This encodes a 7 bit byte.
Binary.sizeOf["byte"] = 1;
Binary.encodeByte = func (int) {
var bf = bits.buf(1);
if (int < 0) int += 128;
bf[0] = math.mod(int, 128) + `A`;
return bf;
}
############################################################
Binary.decodeByte = func (str) {
var v = str[0] - `A`;
if (v / 64 >= 1) v -= 128;
return int(v);
}
############################################################
# NOTE: This encodes a 28 bit integer.
Binary.sizeOf["int28"] = 4;
Binary.encodeInt28 = func (int) {
var bf = bits.buf(4);
if (int < 0) int += Binary.TWOTO32;
var r = int;
for (var i = 0; i < 4; i += 1) {
var c = math.mod(r, 128);
bf[3-i] = c + `A`;
r = (r - c)/128;
}
return bf;
}
############################################################
Binary.decodeInt28 = func (str) {
var v = 0;
var b = 1;
for (var i = 0; i < 4; i += 1) {
v += (str[3-i] - `A`) * b;
b *= 128;
}
if (v / Binary.TWOTO27 >= 1) v -= Binary.TWOTO28;
return int(v);
}
############################################################
# NOTE: This can neither handle huge values nor really tiny.
Binary.sizeOf["double"] = 2*Binary.sizeOf["int"];
Binary.encodeDouble = func (d) {
return Binary.encodeInt(int(d)) ~
Binary.encodeInt((d - int(d)) * Binary.TWOTO31);
}
############################################################
Binary.decodeDouble = func (str) {
return Binary.decodeInt(substr(str, 0)) +
Binary.decodeInt(substr(str, 5)) / Binary.TWOTO31;
}
############################################################
# Encodes a geo.Coord object.
Binary.sizeOf["Coord"] = 3*Binary.sizeOf["double"];
Binary.encodeCoord = func (coord) {
return Binary.encodeDouble(coord.lat()) ~
Binary.encodeDouble(coord.lon()) ~
Binary.encodeDouble(coord.alt());
}
############################################################
# Decodes an encoded geo.Coord object.
Binary.decodeCoord = func (str) {
var coord = geo.aircraft_position();
coord.set_latlon(Binary.decodeDouble(substr(str, 0)),
Binary.decodeDouble(substr(str, 10)),
Binary.decodeDouble(substr(str, 20)));
return coord;
}
############################################################
# Encodes a string as a hash value.
Binary.sizeOf["Hash"] = 4;
Binary.stringHash = func (str) {
var hash = 0;
for(var i=0; i<size(str); i+=1) {
hash += math.mod(32*hash + str[i], Binary.TWOTO28-3);
}
return substr(Binary.encodeInt(hash), 1, 4);
}
############################################################
# Decodes an encoded geo.Coord object.
Binary.readHash = func (str) {
return substr(str, 0, Binary.sizeOf["Hash"]);
}
############################################################
Binary.sizeOf["LamportTS"] = 4;
######################################################################
###############################################################################
# Detects incomming messages encoded in a string property.
# n - MP source : property node
# process - action : func (v)
# NOTE: This is a low level component.
# The same object is seldom used for both sending and receiving.
var MessageChannel = {};
MessageChannel.new = func (n = nil, process = nil) {
var obj = { parents : [MessageChannel],
node : n,
process_msg : process,
old : "" };
return obj;
}
MessageChannel.update = func {
if (me.node == nil) return;
var msg = me.node.getValue();
if (!streq(typeof(msg), "scalar")) return;
if ((me.process_msg != nil) and
!streq(msg, "") and
!streq(msg, me.old)) {
me.process_msg(msg);
me.old = msg;
}
}
MessageChannel.send = func (msg) {
me.node.setValue(msg);
}
MessageChannel.new_message_handler = func (handler, arg1) {
var local_arg1 = arg1; # Disconnect from future changes to arg1.
return func (msg) { handler(local_arg1, msg) };
};