Select Git revision
development_requirements.txt
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
This project manages its dependencies using pip.
Learn more
rMtS.py 23.26 KiB
"""
Repetitive match-to-sample test
A combination of Alekseichuk et al., 2016 (https://doi.org/10.1016/j.cub.2016.04.035) and Berger et al., 2019 (https://doi.org/10.1038/s41467-019-12057-0)
"""
from psychopy import visual, data, logging, gui, core, clock, monitors
import os
from psychopy.constants import (NOT_STARTED, STARTED, PLAYING, PAUSED,
STOPPED, FINISHED, PRESSED, RELEASED, FOREVER)
from numpy import mean, ceil, array, concatenate, linspace, ones, inf, isin, array2string
from numpy.random import shuffle, permutation, randint, choice
from itertools import product
from collections import OrderedDict
from pyniexp.scannersynch import scanner_synch
from pyniexp.stimulation import Waveform, Stimulator
from utils import generate_jitter, generate_sample, get_neighbor
if __name__ == '__main__':
expName = 'repetitive Match-to-Sample'
expInfo = OrderedDict([
('participant',''),
('session','1'),
('grid size',6), # number of cells per axis -> number of cells = gridXY**2
('sample size',4), # number of circles (cannot be more than half of the number of cells)
('sample retention time','[0.5, 1.5]'), # delay after sample
('match type', ['single','multi']), # match type ("single" - ~Alekseichuk or "multi" - ~Berger)
('match number',2), # number of matches per sample
('scanner mode', False), # Is it inside the scanner
('stimulation', False), # Run tES
('stimulation intensity [mA]', 1) # desired intensity (mA)
])
dlg = gui.DlgFromDict(dictionary=expInfo, title=expName, sortKeys=False)
if dlg.OK == False:
core.quit() # user pressed cancel
expInfo['date'] = data.getDateStr() # add a simple timestamp
######## CONFIG ########
# Settings
Monitor = 'testMonitor'
EMUL = not(expInfo['scanner mode']) # Is it outside the scanner
doSTIMULATION = expInfo['stimulation'] # Run tES
# - timings
nDummies = 5
restDuration = 16 # duration of rest between blocks
nBlock = 15 # number of blocks
# Schedule of trial: sampleJitter -> sample -> matchJitter + filler -> nMatch x [ match (same duration as sample) -> responseJitter -> response ]
# Actual: 0.5 + 0.5 + 1 + 2 * ( 0.5 + 0 + 1 ) = 5
nSample = 6 # number of samples per block
sampleJitterRange = [0.25, 0.75] # jitter before sample
sampleDuration = 0.5 # duration of the sample
nMatch = expInfo['match number'] # number of matches per sample
matchJitterRange = eval(expInfo['sample retention time']) # delay after sample
fillerDuration = 0.25
responseJitterRange = [0, 0] # jitter between events
responseDuration = 1 # maximum duration of response
# - brain stimulation
defWave = {
'amplitude': float(expInfo['stimulation intensity [mA]']), # desired intensity (mA)
'frequency': 10,
'phase': 0,
'duration':
nSample*(
array(sampleJitterRange).mean()+
sampleDuration+
array(matchJitterRange).mean()+
nMatch*(
sampleDuration +
array(responseJitterRange).mean()+
responseDuration
)
), # 5s is the lenght if a trial (see above)
'rampUp': 3,
'rampDown': 3,
'samplingRate': 2000,
}
phaseDiff = 0 # phase of thes second channel
frequencies = [0,5,10,20,60] # x5 frequencies
gridSize = 600 # width and height of the grid
gridXY = expInfo['grid size'] # number of cells per axis -> number of cells = gridXY**2
colour = array([-0.5,-0.5,-0.5])
sampleNum = expInfo['sample size'] # number of circles (cannot be more than half of the number of cells)
sampleSize = 0.9 # circle size relative to cell size
tolJitter = 1e-3
# Jitters
sampleJitter = generate_jitter(sampleJitterRange,nSample,tolJitter)
matchJitter = generate_jitter(matchJitterRange,nSample,tolJitter)
responseJitter = generate_jitter(responseJitterRange,nSample*nMatch,tolJitter)
frequencies = concatenate([permutation(frequencies) for i in range(int(ceil(nBlock/len(frequencies))))])
# Grid
sampleSize = gridSize / gridXY * sampleSize
cellCoordinates = gridSize/gridXY*(gridXY-1)/2
cellCoordinates = array(list(product(linspace(-cellCoordinates,cellCoordinates,gridXY),repeat=2)))
gridCoordinates = array([(l,0) for l in linspace(-gridSize/2,gridSize/2,gridXY+1)] + [(0,l) for l in linspace(-gridSize/2,gridSize/2,gridXY+1)])
gridAngles = [90]*(gridXY+1) + [0]*(gridXY+1)
# Buttons
# - button 6 and 7 (zero-indexed) corresponding to NATA right index and middle finger
bYes = 6 # button for 'Yes'
bNo = 7 # button for 'No'
tTot = (restDuration + nBlock * (nSample *
(mean(sampleJitterRange) + sampleDuration + mean(matchJitterRange) +
nMatch * (sampleDuration + mean(responseJitterRange) + responseDuration))
+ restDuration))/60
infoStr = [
"You will perform a visual-spatial match-to-sample test, which is commonly used to assess spatial working memory.\n\nThe test takes about {:.1f} minute(s) and consists of {} trials. In each trial, first, you will see a sample stimulus: {} dots within a grid. This is followed by a mask stimulus, which is the same grid filled with grey dots.\n\nPress 'Yes' to continue.".format(tTot, nBlock*nSample, sampleNum),
"Then, after shorter or longer delay, you will be presented with {} set(s) of test. The tests include a test stimulus, appearance of a single dot, followed by a response period of {:.1f} second(s) marked with a question mark.\n\nYou should press 'Yes' or 'No' during the response period indicating whether the location of the dot in the test stimulus matches any of those in the sample stimulus.\n\nPress 'Yes' when ready to start.".format(nMatch,responseDuration)
]
infoImg = [
os.path.join(os.getcwd(),'html','resources','flow0.jpg'),
os.path.join(os.getcwd(),'html','resources','flow1.jpg')
]
######## LOGGING ########
_thisDir = os.path.dirname(os.path.abspath(__file__))
os.chdir(_thisDir)
filename = _thisDir + os.sep + u'data/%s_%s_%s' % (expInfo['participant'], expName, expInfo['date'])
thisExp = data.ExperimentHandler(name=expName, version='',
extraInfo=expInfo, runtimeInfo=None,
originPath=os.path.join(os.getcwd(),'rMtS.py'),
savePickle=True, saveWideText=True,
dataFileName=filename)
logFile = logging.LogFile(filename+'.log', level=logging.EXP)
logging.console.setLevel(logging.WARNING) # this outputs to the screen, not a file
######## PREPARE ########
blockTrials = []
for b in range(nBlock):
blockTrials += [OrderedDict([])]
sampleTrials = []
shuffle(sampleJitter); shuffle(matchJitter); shuffle(responseJitter)
for s in range(nSample):
[sampleSelection, allSelection] = generate_sample(gridXY,sampleNum,1)
sampleTrials += [OrderedDict([
('onsetSample',sampleJitter[s]),
('sample',sampleSelection),
('onsetMatch',matchJitter[s])
])]
matchTrials = []
for m in range(nMatch):
if expInfo['match type'] == 'single':
match = allSelection[m:m+1]
elif expInfo['match type'] == 'multi':
match = sampleSelection.copy()
indReplace = randint(0,sampleNum)
poolReplace = get_neighbor(gridXY,match[indReplace])
poolReplace = [i for i in poolReplace if i not in match] # non-inclusive neighbors
poolReplace += [match[indReplace]]*len(poolReplace) # pool with equal number of original value and its neighbors (i.e. p=0.5)
match[indReplace] = choice(poolReplace,1)
else:
assert False,'Match type "{}" is not recognised'.format(expInfo['match type'])
matchTrials += [OrderedDict([
('match',match),
('onsetResponse',responseJitter[s*nMatch+m])
])]
sampleTrials[-1]['matchTrials'] = matchTrials
blockTrials[-1]['sampleTrials'] = sampleTrials
blockTrials[-1]['frequency'] = frequencies[b]
loopBlock = data.TrialHandler(nReps=1, method='sequential', trialList=blockTrials, autoLog=False)
thisExp.addLoop(loopBlock)
# Scanner and buttons
SSO = scanner_synch(config='config_scanner.json',emul_synch=EMUL,emul_buttons=EMUL)
SSO.set_synch_readout_time(0.5)
SSO.TR = 1.8
SSO.set_buttonbox_readout_time(0.5)
SSO.buttonbox_timeout = 1 # wait for response for 2 sec
# buttons: no - yes
if not(SSO.emul_buttons): SSO.add_buttonbox('Nata')
else:
SSO.buttons = ['0']*(max([bNo, bYes])+1)
SSO.buttons[bNo] = '2'
SSO.buttons[bYes] = '1'
SSO.start_process()
# Stimulator
if doSTIMULATION:
BSO = Stimulator(configFile='config_stimulation.json')
defWave['frequency'] = frequencies[0]
wave1 = Waveform(**defWave)
defWave['phase'] = phaseDiff
wave2 = Waveform(**defWave)
# Visual
mon = monitors.Monitor(Monitor)
# - set fullscr=True for experiment
# - for multimonitor:
# - set the external screen to primary
# - set screen=1
win = visual.Window([1280,1024],winType='pyglet',screen=0,monitor=mon,units='pix',fullscr=False, autoLog=False, gammaErrorPolicy='ignore')
win.mouseVisible = False
gridForm = visual.ElementArrayStim(win=win, name='gridForm', nElements=(gridXY+1)*2, sizes=[gridSize,2], xys = gridCoordinates, oris=gridAngles, units='pix',
elementTex=ones([16,16]), elementMask=ones([16,16]), colors=colour, colorSpace='rgb', autoLog=False)
restStim = visual.TextStim(win=win, name='Rest',
text="+", height=gridSize*0.1, wrapWidth=win.size[0], autoLog=False)
fullForm = visual.ElementArrayStim(win, nElements=gridXY**2, sizes=sampleSize, xys = cellCoordinates, units='pix',
elementTex=None, elementMask="circle", colors=-colour, colorSpace='rgb', autoLog=False)
responseStim = visual.TextStim(win=win, name='Response',
text="?", height=gridSize*0.9, wrapWidth=win.size[0], autoLog=False)
# Timers
expClock = core.Clock()
trialClock = core.Clock()
interimClock = core.Clock()
logging.setDefaultClock(expClock)
######## INFO SCREENS ########
for i in range(len(infoStr)):
msg = visual.TextStim(win, pos=[0, 400], anchorVert='top', text=infoStr[i], height=gridSize*0.075, wrapWidth=win.size[0]*0.9, alignText='left', autoLog=False)
msg.draw()
img = visual.ImageStim(win, image=infoImg[i], pos=[0, -325], size=[int(i*0.7) for i in [1280, 530]])
img.draw()
win.flip()
SSO.wait_for_button(timeout=inf)
######## WAIT FOR SYNCH ########
msg = visual.TextStim(win, text="Wait for scanner...", height=gridSize*0.1, wrapWidth=win.size[0], autoLog=False)
msg.draw()
win.flip()
SSO.wait_for_synch()
while nDummies:
msg.text="{}".format(nDummies)
msg.draw()
win.flip()
SSO.wait_for_synch()
logging.log(level=logging.DATA, msg='Pulse - {:.3f} - {}'.format(SSO.time_of_last_pulse,SSO.synch_count))
nDummies -= 1
SSO.reset_clock()
expClock.reset()
frameN = -1
######## RUN ########
# Rest
trialClock.reset()
restStim.status = NOT_STARTED
while trialClock.getTime() < restDuration:
# get current time
t = trialClock.getTime()
frameN = frameN + 1 # number of completed frames (so 0 is the first frame)
# *sampleForm* updates
if restStim.status == NOT_STARTED:
# keep track of start time/frame for later
restStim.tStart = t
restStim.frameNStart = frameN # exact frame index
restStim.status = STARTED
win.logOnFlip(level=logging.EXP, msg='Rest - STARTED')
if restStim.status == STARTED:
restStim.draw()
win.flip()
interimClock.reset(restDuration-trialClock.getTime())
if restStim.status == STARTED:
restStim.status = STOPPED
win.logOnFlip(level=logging.EXP, msg='Rest - STOPPED')
# Main loop
for thisBlock in loopBlock:
loopSample = data.TrialHandler(nReps=1, method='sequential', trialList=thisBlock['sampleTrials'], autoLog=False)
thisExp.addLoop(loopSample)
if doSTIMULATION:
if thisBlock['frequency']:
wave1.frequency = thisBlock['frequency']
wave2.frequency = thisBlock['frequency']
BSO.initialize()
BSO.loadWaveform([wave1, wave2])
BSO.stimulate()
logging.log(level=logging.DATA, msg='Stimulation - {:.3f} - Frequency: {}'.format(SSO.clock,BSO.waves[0].frequency))
else:
logging.log(level=logging.DATA, msg='Stimulation - {:.3f} - Frequency: {}'.format(SSO.clock,0))
for thisSample in loopSample:
# Sample
trialClock.reset(-interimClock.getTime())
jitter = thisSample['onsetSample']
sampleCoordinates = cellCoordinates[thisSample['sample'],:]
sampleForm = visual.ElementArrayStim(win, nElements=sampleCoordinates.shape[0], sizes=sampleSize, xys = sampleCoordinates, units='pix',
elementTex=None, elementMask="circle", colors=colour, colorSpace='rgb', autoLog=False)
sampleForm.status = NOT_STARTED
while trialClock.getTime() < (jitter + sampleDuration):
# get current time
t = trialClock.getTime()
frameN = frameN + 1 # number of completed frames (so 0 is the first frame)
# update/draw components on each frame
gridForm.draw()
# *sampleForm* updates
if t >= jitter and sampleForm.status == NOT_STARTED:
# keep track of start time/frame for later
sampleForm.tStart = t
sampleForm.frameNStart = frameN # exact frame index
sampleForm.status = STARTED
win.logOnFlip(level=logging.EXP, msg='Sample - STARTED - ' + array2string(thisSample['sample']))
if sampleForm.status == STARTED:
sampleForm.draw()
win.flip()
interimClock.reset((jitter + sampleDuration) - trialClock.getTime())
if sampleForm.status == STARTED:
sampleForm.status = STOPPED
win.logOnFlip(level=logging.EXP, msg='Sample - STOPPED')
# Match
loopMatch = data.TrialHandler(nReps=1, method='sequential', trialList=thisSample['matchTrials'], autoLog=False)
thisExp.addLoop(loopMatch)
for thisMatch in loopMatch:
# Recall
trialClock.reset(-interimClock.getTime())
if loopMatch.thisTrialN == 0:
jitter = thisSample['onsetMatch']
fullForm.status = NOT_STARTED
else: jitter = 0
sampleCoordinates = cellCoordinates[thisMatch['match'],:]
recallForm = visual.ElementArrayStim(win, nElements=sampleCoordinates.shape[0], sizes=sampleSize, xys = sampleCoordinates, units='pix',
elementTex=None, elementMask="circle", colors=colour, colorSpace='rgb', autoLog=False)
recallForm.status == NOT_STARTED
while trialClock.getTime() < (jitter + sampleDuration):
# get current time
t = trialClock.getTime()
frameN = frameN + 1 # number of completed frames (so 0 is the first frame)
# update/draw components on each frame
gridForm.draw()
# fullForm
if jitter:
if t < fillerDuration and fullForm.status == NOT_STARTED:
fullForm.tStart = t
fullForm.frameNStart = frameN # exact frame index
fullForm.status = STARTED
win.logOnFlip(level=logging.EXP, msg='Mask - STARTED - ' + array2string(thisMatch['match']))
if t >= fillerDuration and fullForm.status == STARTED:
fullForm.status = STOPPED
win.logOnFlip(level=logging.EXP, msg='Mask - STOPPED')
# *recallForm* updates
if t >= jitter and recallForm.status == NOT_STARTED:
recallForm.tStart = t
recallForm.frameNStart = frameN # exact frame index
recallForm.status = STARTED
win.logOnFlip(level=logging.EXP, msg='Match - STARTED - ' + array2string(thisMatch['match']))
if fullForm.status == STARTED:
fullForm.draw()
if recallForm.status == STARTED:
recallForm.draw()
win.flip()
interimClock.reset((jitter + sampleDuration) - trialClock.getTime())
if recallForm.status == STARTED:
recallForm.status = STOPPED
win.logOnFlip(level=logging.EXP, msg='Match - STOPPED')
# Response
trialClock.reset(-interimClock.getTime())
jitter = thisMatch['onsetResponse']
responseStim.status = NOT_STARTED
SSO.reset_buttons()
while trialClock.getTime() < (jitter + responseDuration):
# get current time
t = trialClock.getTime()
frameN = frameN + 1 # number of completed frames (so 0 is the first frame)
# update/draw components on each frame
gridForm.draw()
# *sampleForm* updates
if t >= jitter and responseStim.status == NOT_STARTED:
# keep track of start time/frame for later
responseStim.tStart = t
responseStim.frameNStart = frameN # exact frame index
responseStim.setAutoDraw(True)
win.logOnFlip(level=logging.EXP, msg='Response - STARTED')
SSO.wait_for_button(no_block=True)
win.flip()
interimClock.reset((jitter + responseDuration)-trialClock.getTime())
if responseStim.status == STARTED:
responseStim.status = STOPPED
responseStim.setAutoDraw(False)
win.logOnFlip(level=logging.EXP, msg='Response - STOPPED')
if len(SSO.buttonpresses): # no - SSO.buttonpresses[-1][0] = bNo; yes - SSO.buttonpresses[-1][0] = bYes
logging.log(level=logging.EXP, msg='Button - {:.3f} - {}'.format(SSO.buttonpresses[-1][1],SSO.buttonpresses[-1][0]))
thisExp.addData('resp.key',SSO.buttonpresses[-1][0])
thisExp.addData('resp.rt',SSO.buttonpresses[-1][1]-(SSO.clock-trialClock.getTime())-responseStim.tStart)
if expInfo['match type'] == 'single':
isMatch = isin(thisMatch['match'],thisSample['sample'])
elif expInfo['match type'] == 'multi':
isMatch = all(thisMatch['match'] == thisSample['sample'])
if isMatch and (SSO.buttonpresses[-1][0] == bYes):
thisExp.addData('resp.code','hit')
elif not(isMatch) and (SSO.buttonpresses[-1][0] == bNo):
thisExp.addData('resp.code','cr')
else: thisExp.addData('resp.code','false')
else: thisExp.addData('resp.code','miss')
thisExp.nextEntry()
# Rest
trialClock.reset(-interimClock.getTime())
restStim.status = NOT_STARTED
while trialClock.getTime() < restDuration:
# get current time
t = trialClock.getTime()
frameN = frameN + 1 # number of completed frames (so 0 is the first frame)
# *sampleForm* updates
if restStim.status == NOT_STARTED:
# keep track of start time/frame for later
restStim.tStart = t
restStim.frameNStart = frameN # exact frame index
restStim.status = STARTED
win.logOnFlip(level=logging.EXP, msg='Rest - STARTED')
if restStim.status == STARTED:
restStim.draw()
win.flip()
interimClock.reset(restDuration-trialClock.getTime())
if restStim.status == STARTED:
restStim.status = STOPPED
win.logOnFlip(level=logging.EXP, msg='Rest - STOPPED')
win.flip()
SSO = None
if doSTIMULATION: BSO = None
######## EVENTS (BIDS) ########
logging.flush()
from numpy import fromstring
import csv
eventFile = filename + '_events.tsv'
fOut = open(eventFile,'w',newline='')
ev = csv.writer(fOut, delimiter='\t')
ev.writerow(['onset','duration','trial_type','response_time','value'])
fIn = open(filename+'.log')
log = csv.reader(fIn, delimiter='\t')
sample = []
nMatch = 0
match = []
itemToWrite = [None]*5 # 5 columns
button = []
for item in log:
if len(item) < 2 or (item[1].find('EXP') == -1 and item[2].find('Stimulation') == -1): continue
if item[2].find('Stimulation') >= 0:
ev.writerow([round(float(item[0]),4), 30, 'Stimulation_'+str(expInfo['stimulation intensity [mA]']), None, int(item[2].split(' - ')[2].split(': ')[1])])
continue
if any(item[2].find(evs) >= 0 for evs in ['Sample', 'Match', 'Response']):
if itemToWrite[0] is None:
itemToWrite[0:3] = [round(float(item[0]),4), None, item[2].split(' - ')[0]]
if item[2].find('Sample') >= 0:
sample = fromstring(item[2].split(' - ')[2][1:-1],sep=' ')
nMatch = 0
elif item[2].find('Match') >= 0:
match = fromstring(item[2].split(' - ')[2][1:-1],sep=' ')
nMatch += 1
itemToWrite[2] += str(nMatch)
else:
itemToWrite[1] = round(float(item[0]) - itemToWrite[0],4)
if item[2].find('Response') >= 0:
itemToWrite[2] += str(nMatch)
if len(button):
button[0] = round(button[0]-itemToWrite[0],4)
else:
button = ['n/a','miss']
itemToWrite[3:5] = button
button = []
ev.writerow(itemToWrite)
itemToWrite = [None]*5 # 5 columns
elif item[2].find('Button') >= 0:
button += [float(item[2].split(' - ')[1])]
if not(isin(match,sample) ^ (item[2].split(' - ')[2] == str(bYes))):
button += ['hit']
else: button += ['false']
fIn.close()
fOut.close()